abhinavgautam01 commented on code in PR #1685:
URL:
https://github.com/apache/datafusion-ballista/pull/1685#discussion_r3223132557
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
+
+ let executor_meta = ExecutorRegistration {
+ id: Uuid::new_v4().to_string(),
+ host: Some("localhost".to_string()),
+ port: flight_addr.port() as u32,
+ grpc_port: grpc_port as u32,
+ specification: Some(
+ ExecutorSpecification::default()
+ .with_task_slots(concurrent_tasks as u32)
+ .into(),
+ ),
+ os_info: Some(ExecutorOperatingSystemSpecification::default().into()),
+ };
+
+ let config_snap = config_producer();
+ let max_message_sz = config_snap.ballista_grpc_client_max_message_size()
as u32;
+
+ let work_dir = TempDir::new()?.path().to_str().unwrap().to_string();
+ info!("work_dir: {work_dir}");
+
+ let executor = Arc::new(Executor::with_default_execution_engine(
+ executor_meta,
+ &work_dir,
+ runtime_producer,
+ config_producer.clone(),
+ Arc::new(function_registry),
+ Arc::new(LoggingMetricsCollector::default()),
+ concurrent_tasks,
+ ));
+
+ let service = BallistaFlightService::new(work_dir);
+ let server = FlightServiceServer::new(service)
+ .max_decoding_message_size(max_message_sz as usize)
+ .max_encoding_message_size(max_message_sz as usize);
+
+ tokio::spawn(
+ create_grpc_server(&GrpcServerConfig::default())
+ .add_service(server)
+
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+ flight_listener,
+ )),
+ );
+
+ let exec_cfg = ExecutorProcessConfig {
+ bind_host: "127.0.0.1".into(),
+ port: flight_addr.port(),
+ grpc_port,
+ concurrent_tasks,
+ task_scheduling_policy: TaskSchedulingPolicy::PushStaged,
+ grpc_max_decoding_message_size: max_message_sz,
+ grpc_max_encoding_message_size: max_message_sz,
+ ..ExecutorProcessConfig::default()
+ };
+
+ let shutdown_notifier: &'static ShutdownNotifier =
+ Box::leak(Box::new(ShutdownNotifier::new()));
Review Comment:
Removed Box::leak. Standalone uses Arc::new(ShutdownNotifier::new()) and
keeps one Arc clone alive inside the task that awaits the server handle so the
notifier isn’t freed while gRPC is still running—no leak and no process-global
static state...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]