abhinavgautam01 commented on code in PR #1685:
URL: 
https://github.com/apache/datafusion-ballista/pull/1685#discussion_r3223128736


##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
     Ok(())
 }
 
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+    scheduler: SchedulerGrpcClient<Channel>,
+    concurrent_tasks: usize,
+    config_producer: ConfigProducer,
+    runtime_producer: RuntimeProducer,
+    codec: BallistaCodec,
+    function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+    let flight_listener = TcpListener::bind("localhost:0").await?;
+    let flight_addr = flight_listener.local_addr()?;
+    info!(
+        "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on 
{flight_addr:?}"
+    );
+
+    let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+    let grpc_port = grpc_probe.local_addr()?.port();
+    drop(grpc_probe);
+
+    let executor_meta = ExecutorRegistration {
+        id: Uuid::new_v4().to_string(),
+        host: Some("localhost".to_string()),
+        port: flight_addr.port() as u32,
+        grpc_port: grpc_port as u32,
+        specification: Some(
+            ExecutorSpecification::default()
+                .with_task_slots(concurrent_tasks as u32)
+                .into(),
+        ),
+        os_info: Some(ExecutorOperatingSystemSpecification::default().into()),
+    };
+
+    let config_snap = config_producer();
+    let max_message_sz = config_snap.ballista_grpc_client_max_message_size() 
as u32;
+
+    let work_dir = TempDir::new()?.path().to_str().unwrap().to_string();
+    info!("work_dir: {work_dir}");
+
+    let executor = Arc::new(Executor::with_default_execution_engine(
+        executor_meta,
+        &work_dir,
+        runtime_producer,
+        config_producer.clone(),
+        Arc::new(function_registry),
+        Arc::new(LoggingMetricsCollector::default()),
+        concurrent_tasks,
+    ));
+
+    let service = BallistaFlightService::new(work_dir);
+    let server = FlightServiceServer::new(service)
+        .max_decoding_message_size(max_message_sz as usize)
+        .max_encoding_message_size(max_message_sz as usize);
+
+    tokio::spawn(
+        create_grpc_server(&GrpcServerConfig::default())
+            .add_service(server)
+            
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+                flight_listener,
+            )),
+    );
+
+    let exec_cfg = ExecutorProcessConfig {
+        bind_host: "127.0.0.1".into(),
+        port: flight_addr.port(),
+        grpc_port,
+        concurrent_tasks,
+        task_scheduling_policy: TaskSchedulingPolicy::PushStaged,
+        grpc_max_decoding_message_size: max_message_sz,
+        grpc_max_encoding_message_size: max_message_sz,
+        ..ExecutorProcessConfig::default()
+    };
+
+    let shutdown_notifier: &'static ShutdownNotifier =
+        Box::leak(Box::new(ShutdownNotifier::new()));
+    let (stop_send, _stop_recv) = mpsc::channel::<bool>(10);

Review Comment:
   agreed, that would close the channel and later sends on stop_send could 
fail. There is now a lightweight task that recv()s in a loop so the Sender 
stays usable...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to