avantgardnerio commented on code in PR #153:
URL: https://github.com/apache/arrow-ballista/pull/153#discussion_r954045554


##########
ballista/rust/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -514,16 +514,15 @@ mod test {
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let namespace = "default";
         let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
+                "localhost:50050".to_owned(),
                 state_storage.clone(),
-                namespace.to_owned(),
                 BallistaCodec::default(),
             );
         let exec_meta = ExecutorRegistration {
             id: "abc".to_owned(),
-            optional_host: 
Some(OptionalHost::Host("http://host:8080".to_owned())),
+            optional_host: 
Some(OptionalHost::Host("http://localhost:8080".to_owned())),

Review Comment:
   I'm a bit wary of `localhost` here as it will sometimes resolve to IPv6 
addresses. Is the rest of the stack capable of supporting those if that is the 
result of the resolution? `127.0.0.1` might be slightly safer.



##########
ballista/rust/scheduler/src/main.rs:
##########
@@ -158,12 +158,13 @@ async fn main() -> Result<()> {
 
     let special_mod_log_level = opt.log_level_setting;
     let namespace = opt.namespace;
+    let external_host = opt.external_host;
     let bind_host = opt.bind_host;
     let port = opt.bind_port;
     let log_dir = opt.log_dir;
     let print_thread_info = opt.print_thread_info;
 
-    let scheduler_name = format!("scheduler_{}_{}_{}", namespace, bind_host, 
port);
+    let scheduler_name = format!("scheduler_{}_{}_{}", namespace, 
external_host, port);

Review Comment:
   Definitely should be `external` vs `bind` as `bind` could and probably 
should be `0.0.0.0` for all instances of the scheduler.



##########
ballista/rust/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -514,16 +514,15 @@ mod test {
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let namespace = "default";
         let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
+                "localhost:50050".to_owned(),
                 state_storage.clone(),
-                namespace.to_owned(),
                 BallistaCodec::default(),
             );
         let exec_meta = ExecutorRegistration {
             id: "abc".to_owned(),
-            optional_host: 
Some(OptionalHost::Host("http://host:8080".to_owned())),
+            optional_host: 
Some(OptionalHost::Host("http://localhost:8080".to_owned())),

Review Comment:
   oh, I see this is just for testing n/m



##########
ballista/rust/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -390,7 +390,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                     .and_then(|m| {
                         m.try_into_logical_plan(
                             session_ctx.deref(),
-                            self.codec.logical_extension_codec(),
+                            self.state.codec.logical_extension_codec(),

Review Comment:
   Sorry, I'm daft - why does the codec move out of state? It needs to be 
unique to the scheduler instance for some reason?



##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -410,87 +594,33 @@ impl ExecutionGraph {
         session_ctx: &SessionContext,
     ) -> Result<ExecutionGraph> {
         let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
-        for stage in proto.stages {
-            let plan_proto = U::try_decode(stage.plan.as_slice())?;
-            let plan = plan_proto.try_into_physical_plan(
-                session_ctx,
-                session_ctx.runtime_env().as_ref(),
-                codec.physical_extension_codec(),
-            )?;
-
-            let stage_id = stage.stage_id as usize;
-            let partitions: usize = stage.partitions as usize;
-
-            let mut task_statuses: Vec<Option<task_status::Status>> =
-                vec![None; partitions];
-
-            for status in stage.task_statuses {
-                if let Some(task_id) = status.task_id.as_ref() {
-                    task_statuses[task_id.partition_id as usize] = 
status.status
+        for graph_stage in proto.stages {
+            let stage_type = graph_stage.stage_type.expect("Unexpected empty 
stage");

Review Comment:
   I'd love to see this turned into a `?`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to