yahoNanJing commented on code in PR #153:
URL: https://github.com/apache/arrow-ballista/pull/153#discussion_r954429652


##########
ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs:
##########
@@ -0,0 +1,914 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::task_status;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed 
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage           FailedStage
+///       ↓            ↙           ↑
+///  ResolvedStage     →     RunningStage
+///                                ↓
+///                         CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+    UnResolved(UnResolvedStage),
+    Resolved(ResolvedStage),
+    Running(RunningStage),
+    Completed(CompletedStage),
+    Failed(FailedStage),
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ExecutionStage::UnResolved(unresolved_stage) => 
unresolved_stage.fmt(f),
+            ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f),
+            ExecutionStage::Running(running_stage) => running_stage.fmt(f),
+            ExecutionStage::Completed(completed_stage) => 
completed_stage.fmt(f),
+            ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f),
+        }
+    }
+}
+
+/// For a stage whose input stages are not all completed, we say it's a 
unresolved stage
+#[derive(Clone)]
+pub(super) struct UnResolvedStage {

Review Comment:
   Agree



##########
ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs:
##########
@@ -0,0 +1,914 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::task_status;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed 
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage           FailedStage
+///       ↓            ↙           ↑
+///  ResolvedStage     →     RunningStage
+///                                ↓
+///                         CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+    UnResolved(UnResolvedStage),
+    Resolved(ResolvedStage),
+    Running(RunningStage),
+    Completed(CompletedStage),
+    Failed(FailedStage),
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ExecutionStage::UnResolved(unresolved_stage) => 
unresolved_stage.fmt(f),
+            ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f),
+            ExecutionStage::Running(running_stage) => running_stage.fmt(f),
+            ExecutionStage::Completed(completed_stage) => 
completed_stage.fmt(f),
+            ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f),
+        }
+    }
+}
+
+/// For a stage whose input stages are not all completed, we say it's a 
unresolved stage
+#[derive(Clone)]
+pub(super) struct UnResolvedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// Represents the outputs from this stage's child stages.
+    /// This stage can only be resolved an executed once all child stages are 
completed.
+    pub(super) inputs: HashMap<usize, StageOutput>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// For a stage, if it has no inputs or all of its input stages are completed,
+/// then we it's a resolved stage
+#[derive(Clone)]
+pub(super) struct ResolvedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// Different from the resolved stage, a running stage will
+/// 1. save the execution plan as encoded one to avoid serialization cost for 
creating task definition2
+/// 2. manage the task statuses
+/// 3. manage the stage-level combined metrics
+/// Running stages will only be maintained in memory and will not saved to the 
backend storage
+#[derive(Clone)]
+pub(super) struct RunningStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(super) task_statuses: Vec<Option<task_status::Status>>,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+}
+
+/// If a stage finishes successfully, its task statuses and metrics will be 
finalized
+#[derive(Clone)]
+pub(super) struct CompletedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task.
+    pub(super) task_statuses: Vec<task_status::Status>,
+    /// Combined metrics of the already finished tasks in the stage.
+    pub(super) stage_metrics: Vec<MetricsSet>,
+}
+
+/// If a stage fails, it will be with an error message
+#[derive(Clone)]
+pub(super) struct FailedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(super) task_statuses: Vec<Option<task_status::Status>>,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+    /// Error message
+    pub(super) error_message: String,
+}
+
+impl UnResolvedStage {

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to