yahoNanJing commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r896317299


##########
ballista/rust/scheduler/src/scheduler_server/event_loop.rs:
##########
@@ -105,9 +105,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 .executor_manager
                 .cancel_reservations(free_list)
                 .await?;
+            Ok(None)
+        } else if pending_tasks > 0 {

Review Comment:
   Why we need *else* here? ❓ 



##########
ballista/rust/scheduler/src/state/executor_manager.rs:
##########
@@ -15,43 +15,354 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
 
-use ballista_core::serde::protobuf::ExecutorHeartbeat;
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange};
-use log::{error, info, warn};
+use crate::state::backend::{Keyspace, StateBackendClient, WatchEvent};
+
+use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::serde::protobuf;
+
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use futures::StreamExt;
+use log::{debug, info};
 use parking_lot::RwLock;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
+/// Represents a task slot that is reserved (i.e. available for scheduling but 
not visible to the
+/// rest of the system).
+/// When tasks finish we want to preferentially assign new tasks from the same 
job, so the reservation
+/// can already be assigned to a particular job ID. In that case, the 
scheduler will try to schedule
+/// available tasks for that job to the reserved task slot.
+#[derive(Clone, Debug)]
+pub struct ExecutorReservation {
+    pub executor_id: String,
+    pub job_id: Option<String>,
+}
+
+impl ExecutorReservation {
+    pub fn new_free(executor_id: String) -> Self {
+        Self {
+            executor_id,
+            job_id: None,
+        }
+    }
+
+    pub fn new_assigned(executor_id: String, job_id: String) -> Self {
+        Self {
+            executor_id,
+            job_id: Some(job_id),
+        }
+    }
+
+    pub fn assign(mut self, job_id: String) -> Self {
+        self.job_id = Some(job_id);
+        self
+    }
+
+    pub fn assigned(&self) -> bool {
+        self.job_id.is_some()
+    }
+}
+
 #[derive(Clone)]
 pub(crate) struct ExecutorManager {
-    executors_heartbeat: Arc<RwLock<HashMap<String, ExecutorHeartbeat>>>,
-    executors_data: Arc<RwLock<HashMap<String, ExecutorData>>>,
+    state: Arc<dyn StateBackendClient>,
+    executor_metadata: Arc<RwLock<HashMap<String, ExecutorMetadata>>>,
+    executors_heartbeat: Arc<RwLock<HashMap<String, 
protobuf::ExecutorHeartbeat>>>,
 }
 
 impl ExecutorManager {
-    pub(crate) fn new() -> Self {
+    pub(crate) fn new(state: Arc<dyn StateBackendClient>) -> Self {
         Self {
+            state,
+            executor_metadata: Arc::new(RwLock::new(HashMap::new())),
             executors_heartbeat: Arc::new(RwLock::new(HashMap::new())),
-            executors_data: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 
-    pub(crate) fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) 
{
+    /// Initialize the `ExecutorManager` state. This will fill the 
`executor_heartbeats` value
+    /// with existing heartbeats. Then new updates will be consumed through 
the `ExecutorHeartbeatListener`
+    pub async fn init(&self) -> Result<()> {
+        self.init_executor_heartbeats().await?;
+        let heartbeat_listener = ExecutorHeartbeatListener::new(
+            self.state.clone(),
+            self.executors_heartbeat.clone(),
+        );
+        heartbeat_listener.start().await
+    }
+
+    /// Reserve up to n executor task slots. Once reserved these slots will 
not be available
+    /// for scheduling.
+    /// This operation is atomic, so if this method return an Err, no slots 
have been reserved.
+    pub async fn reserve_slots(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
+        let lock = self.state.lock(Keyspace::Slots, "global").await?;
+
+        with_lock(lock, async {
+            debug!("Attempting to reserve {} executor slots", n);
+            let start = Instant::now();
+            let mut reservations: Vec<ExecutorReservation> = vec![];
+            let mut desired: u32 = n;
+
+            let alive_executors = self.get_alive_executors_within_one_minute();
+
+            let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
+
+            for executor_id in alive_executors {
+                let value = self.state.get(Keyspace::Slots, 
&executor_id).await?;
+                let mut data =
+                    decode_into::<protobuf::ExecutorData, 
ExecutorData>(&value)?;
+                let take = std::cmp::min(data.available_task_slots, desired);
+
+                for _ in 0..take {

Review Comment:
   It's a different policy from the previous round-robin one. Not sure whether 
it's better for the tasks be scheduled evenly to the executors?



##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -0,0 +1,974 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::DistributedPlanner;
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
+
+use ballista_core::serde::protobuf::{
+    self, CompletedJob, JobStatus, QueuedJob, TaskStatus,
+};
+use ballista_core::serde::protobuf::{job_status, FailedJob, 
ShuffleWritePartition};
+use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::scheduler::{
+    ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
+};
+use datafusion::physical_plan::{
+    accept, ExecutionPlan, ExecutionPlanVisitor, Partitioning,
+};
+use log::debug;
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use std::sync::Arc;
+
+/// This data structure collects the partition locations for an 
`ExecutionStage`.
+/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child 
stages.
+/// When all tasks for the child stage are complete, it will mark the 
`StageOutput`
+#[derive(Clone, Debug, Default)]
+pub struct StageOutput {
+    /// Map from partition -> partition locations
+    pub(crate) partition_locations: HashMap<usize, Vec<PartitionLocation>>,
+    /// Flag indicating whether all tasks are complete
+    pub(crate) complete: bool,
+}
+
+impl StageOutput {
+    pub fn new() -> Self {
+        Self {
+            partition_locations: HashMap::new(),
+            complete: false,
+        }
+    }
+
+    /// Add a `PartitionLocation` to the `StageOutput`
+    pub fn add_partition(&mut self, partition_location: PartitionLocation) {
+        if let Some(parts) = self
+            .partition_locations
+            .get_mut(&partition_location.partition_id.partition_id)
+        {
+            parts.push(partition_location)
+        } else {
+            self.partition_locations.insert(
+                partition_location.partition_id.partition_id,
+                vec![partition_location],
+            );
+        }
+    }
+
+    pub fn is_complete(&self) -> bool {
+        self.complete
+    }
+}
+
+/// A stage in the ExecutionGraph.
+///
+/// This represents a set of tasks (one per each `partition`) which can
+/// be executed concurrently.
+#[derive(Clone)]
+pub struct ExecutionStage {
+    /// Stage ID
+    pub(crate) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(crate) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(crate) output_partitioning: Option<Partitioning>,
+    /// Represents the outputs from this stage's child stages.
+    /// This stage can only be resolved an executed once all child stages are 
completed.
+    pub(crate) inputs: HashMap<usize, StageOutput>,
+    // `ExecutionPlan` for this stage
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(crate) task_statuses: Vec<Option<task_status::Status>>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_link` is `None` then this the final stage in the 
`ExecutionGraph`
+    pub(crate) output_link: Option<usize>,
+    /// Flag indicating whether all input partitions have been resolved and 
the plan
+    /// has UnresovledShuffleExec operators resolved to ShuffleReadExec 
operators.
+    pub(crate) resolved: bool,
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let scheduled_tasks = self.task_statuses.iter().filter(|t| 
t.is_some()).count();
+
+        write!(
+            f,
+            "Stage[id={}, partitions={:?}, children={}, completed_tasks={}, 
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
+            self.stage_id,
+            self.partitions,
+            self.inputs.len(),
+            self.completed_tasks(),
+            self.resolved,
+            scheduled_tasks,
+            self.available_tasks(),
+            self.inputs,
+            plan
+        )
+    }
+}
+
+impl ExecutionStage {
+    pub fn new(
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        output_partitioning: Option<Partitioning>,
+        output_link: Option<usize>,
+        child_stages: Vec<usize>,
+    ) -> Self {
+        let num_tasks = plan.output_partitioning().partition_count();
+
+        let resolved = child_stages.is_empty();
+
+        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+
+        for input_stage_id in &child_stages {
+            inputs.insert(*input_stage_id, StageOutput::new());
+        }
+
+        Self {
+            stage_id,
+            partitions: num_tasks,
+            output_partitioning,
+            inputs,
+            plan,
+            task_statuses: vec![None; num_tasks],
+            output_link,
+            resolved,
+        }
+    }
+
+    /// Returns true if all inputs are complete and we can resolve all
+    /// UnresolvedShuffleExec operators to ShuffleReadExec
+    pub fn resolvable(&self) -> bool {
+        self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
+    }
+
+    /// Returns `true` if all tasks for this stage are complete
+    pub fn complete(&self) -> bool {
+        self.task_statuses
+            .iter()
+            .all(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+    }
+
+    /// Returns the number of tasks
+    pub fn completed_tasks(&self) -> usize {
+        self.task_statuses
+            .iter()
+            .filter(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+            .count()
+    }
+
+    /// Marks the input stage ID as complete.
+    pub fn complete_input(&mut self, stage_id: usize) {
+        if let Some(input) = self.inputs.get_mut(&stage_id) {
+            input.complete = true;
+        }
+    }
+
+    /// Returns true if the stage plan has all UnresolvedShuffleExec operators 
resolved to
+    /// ShuffleReadExec
+    pub fn resolved(&self) -> bool {
+        self.resolved
+    }
+
+    /// Returns the number of tasks in this stage which are available for 
scheduling.
+    /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
+    /// return the number of tasks where the task status is not yet set.
+    pub fn available_tasks(&self) -> usize {
+        if self.resolved {
+            self.task_statuses.iter().filter(|s| s.is_none()).count()
+        } else {
+            0
+        }
+    }
+
+    /// Resolve any UnresolvedShuffleExec operators within this stage's plan
+    pub fn resolve_shuffles(&mut self) -> Result<()> {
+        println!("Resolving shuffles\n{:?}", self);
+        if self.resolved {
+            // If this stage has no input shuffles, then it is already resolved
+            Ok(())
+        } else {
+            let input_locations = self
+                .inputs
+                .iter()
+                .map(|(stage, outputs)| (*stage, 
outputs.partition_locations.clone()))
+                .collect();
+            // Otherwise, rewrite the plan to replace UnresolvedShuffleExec 
with ShuffleReadExec
+            let new_plan = crate::planner::remove_unresolved_shuffles(
+                self.plan.clone(),
+                &input_locations,
+            )?;
+            self.plan = new_plan;
+            self.resolved = true;
+            Ok(())
+        }
+    }
+
+    /// Update the status for task partition
+    pub fn update_task_status(&mut self, partition: usize, status: 
task_status::Status) {
+        debug!("Updating task status for partition {}", partition);
+        self.task_statuses[partition] = Some(status);
+    }
+
+    /// Add input partitions published from an input stage.
+    pub fn add_input_partitions(
+        &mut self,
+        stage_id: usize,
+        _partition_id: usize,
+        locations: Vec<PartitionLocation>,
+    ) -> Result<()> {
+        if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
+            for partition in locations {
+                stage_inputs.add_partition(partition);
+            }
+        } else {
+            return Err(BallistaError::Internal(format!("Error adding input 
partitions to stage {}, {} is not a valid child stage ID", self.stage_id, 
stage_id)));
+        }
+
+        Ok(())
+    }
+}
+
+/// Utility for building a set of `ExecutionStage`s from
+/// a list of `ShuffleWriterExec`.
+///
+/// This will infer the dependency structure for the stages
+/// so that we can construct a DAG from the stages.
+struct ExecutionStageBuilder {
+    /// Stage ID which is currently being visited
+    current_stage_id: usize,
+    /// Map from stage ID -> List of child stage IDs
+    stage_dependencies: HashMap<usize, Vec<usize>>,
+    /// Map from Stage ID -> output link
+    output_links: HashMap<usize, usize>,

Review Comment:
   One stage may be the input of multiple sub-stages. Therefore, the data 
structure of output_links should be *HashMap<usize, Vec<usize>>*



##########
ballista/rust/scheduler/src/scheduler_server/event_loop.rs:
##########
@@ -60,13 +60,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
         &self,
         reservations: Vec<ExecutorReservation>,
     ) -> Result<Option<SchedulerServerEvent>> {
-        let free_list = match self
+        let (free_list, pending_tasks) = match self
             .state
             .task_manager
             .fill_reservations(&reservations)
             .await
         {
-            Ok((assignments, mut unassigned_reservations)) => {
+            Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
                 for (executor_id, task) in assignments.into_iter() {

Review Comment:
   Is it better to classify tasks for the same executor and then can be 
launched only once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to