This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 9206bdb2 Fix for error message during testing (#707)
9206bdb2 is described below

commit 9206bdb2cd8bfd6a01cafdb5a09a37767b2b9488
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 15 12:45:53 2023 +0800

    Fix for error message during testing (#707)
    
    * Fix cargo clippy
    
    * Fix for error message during testing
    
    * Remove unwrap for dealing with JobQueued event
    
    * log task ids when launch tasks
    
    ---------
    
    Co-authored-by: yangzhong <[email protected]>
---
 .../core/src/execution_plans/shuffle_reader.rs     |  1 -
 ballista/scheduler/src/cluster/event/mod.rs        |  6 ++---
 ballista/scheduler/src/cluster/memory.rs           |  1 -
 .../src/scheduler_server/query_stage_scheduler.rs  |  8 +++---
 .../src/state/execution_graph/execution_stage.rs   | 30 ++++++++++++++--------
 ballista/scheduler/src/state/mod.rs                |  1 -
 ballista/scheduler/src/state/task_manager.rs       | 18 ++++++++++++-
 ballista/scheduler/src/test_utils.rs               |  2 --
 benchmarks/src/bin/tpch.rs                         |  5 +---
 9 files changed, 43 insertions(+), 29 deletions(-)

diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 20c31c50..3bab9266 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -637,7 +637,6 @@ mod tests {
 
     fn get_test_partition_locations(n: usize, path: String) -> 
Vec<PartitionLocation> {
         (0..n)
-            .into_iter()
             .map(|partition_id| PartitionLocation {
                 map_partition_id: 0,
                 partition_id: PartitionId {
diff --git a/ballista/scheduler/src/cluster/event/mod.rs 
b/ballista/scheduler/src/cluster/event/mod.rs
index 88b3c297..b4b70836 100644
--- a/ballista/scheduler/src/cluster/event/mod.rs
+++ b/ballista/scheduler/src/cluster/event/mod.rs
@@ -194,7 +194,7 @@ mod test {
             }
         });
 
-        let expected: Vec<i32> = (0..100).into_iter().collect();
+        let expected: Vec<i32> = (0..100).collect();
 
         let results = handle.await.unwrap();
         assert_eq!(results.len(), 3);
@@ -233,7 +233,7 @@ mod test {
 
         // When we reach capacity older events should be dropped so we only see
         // the last 8 events in our subscribers
-        let expected: Vec<i32> = (92..100).into_iter().collect();
+        let expected: Vec<i32> = (92..100).collect();
 
         let results = handle.await.unwrap();
         assert_eq!(results.len(), 3);
@@ -271,7 +271,7 @@ mod test {
             }
         });
 
-        let expected: Vec<i32> = (1..=100).into_iter().collect();
+        let expected: Vec<i32> = (1..=100).collect();
 
         let results = handle.await.unwrap();
         assert_eq!(results.len(), 3);
diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index 1a852ea1..95c6b7a2 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -187,7 +187,6 @@ impl ClusterState for InMemoryClusterState {
             let slots = std::mem::take(&mut spec.available_task_slots) as 
usize;
 
             let reservations = (0..slots)
-                .into_iter()
                 .map(|_| ExecutorReservation::new_free(metadata.id.clone()))
                 .collect();
 
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 3b035470..c4296fb8 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -131,11 +131,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                             resubmit: false,
                         }
                     };
-                    tx_event
-                        .post_event(event)
-                        .await
-                        .map_err(|e| error!("Fail to send event due to {}", e))
-                        .unwrap();
+                    if let Err(e) = tx_event.post_event(event).await {
+                        error!("Fail to send event due to {}", e);
+                    }
                 });
             }
             QueryStageSchedulerEvent::JobSubmitted {
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index f1181286..542d5116 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -786,7 +786,12 @@ impl RunningStage {
         partition: usize,
         metrics: Vec<OperatorMetricsSet>,
     ) -> Result<()> {
-        if let Some(combined_metrics) = &mut self.stage_metrics {
+        // For some cases, task metrics not set, especially for testings.
+        if metrics.is_empty() {
+            return Ok(());
+        }
+
+        let new_metrics_set = if let Some(combined_metrics) = &mut 
self.stage_metrics {
             if metrics.len() != combined_metrics.len() {
                 return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
                 with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
@@ -801,23 +806,21 @@ impl RunningStage {
                 })
                 .collect::<Result<Vec<_>>>()?;
 
-            let new_metrics_set = combined_metrics
+            combined_metrics
                 .iter_mut()
                 .zip(metrics_values_array)
                 .map(|(first, second)| {
                     Self::combine_metrics_set(first, second, partition)
                 })
-                .collect();
-            self.stage_metrics = Some(new_metrics_set)
+                .collect()
         } else {
-            let new_metrics_set = metrics
+            metrics
                 .into_iter()
                 .map(|ms| ms.try_into())
-                .collect::<Result<Vec<_>>>()?;
-            if !new_metrics_set.is_empty() {
-                self.stage_metrics = Some(new_metrics_set)
-            }
-        }
+                .collect::<Result<Vec<_>>>()?
+        };
+        self.stage_metrics = Some(new_metrics_set);
+
         Ok(())
     }
 
@@ -935,6 +938,11 @@ impl SuccessfulStage {
                 _ => task_infos.push(None),
             }
         }
+        let stage_metrics = if self.stage_metrics.is_empty() {
+            None
+        } else {
+            Some(self.stage_metrics.clone())
+        };
         RunningStage {
             stage_id: self.stage_id,
             stage_attempt_num: self.stage_attempt_num + 1,
@@ -946,7 +954,7 @@ impl SuccessfulStage {
             task_infos,
             // It is Ok to forget the previous task failure attempts
             task_failure_numbers: vec![0; self.partitions],
-            stage_metrics: Some(self.stage_metrics.clone()),
+            stage_metrics,
         }
     }
 
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index d38415bd..57ea0be6 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -170,7 +170,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
 
         let total_num_tasks = tasks_status.len();
         let reservations = (0..total_num_tasks)
-            .into_iter()
             .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
             .collect();
 
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index 00532962..0d3293f7 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -85,7 +85,23 @@ impl TaskLauncher for DefaultTaskLauncher {
         tasks: Vec<MultiTaskDefinition>,
         executor_manager: &ExecutorManager,
     ) -> Result<()> {
-        info!("Launching multi task on executor {:?}", executor.id);
+        if log::max_level() >= log::Level::Info {
+            let tasks_ids: Vec<String> = tasks
+                .iter()
+                .map(|task| {
+                    let task_ids: Vec<u32> = task
+                        .task_ids
+                        .iter()
+                        .map(|task_id| task_id.partition_id)
+                        .collect();
+                    format!("{}/{}/{:?}", task.job_id, task.stage_id, task_ids)
+                })
+                .collect();
+            info!(
+                "Launching multi task on executor {:?} for {:?}",
+                executor.id, tasks_ids
+            );
+        }
         let mut client = executor_manager.get_client(&executor.id).await?;
         client
             .launch_multi_task(protobuf::LaunchMultiTaskParams {
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index e0971dc6..ee35958d 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -278,7 +278,6 @@ pub fn default_task_runner() -> impl TaskRunner {
             };
 
         let partitions: Vec<ShuffleWritePartition> = (0..partitions)
-            .into_iter()
             .map(|i| ShuffleWritePartition {
                 partition_id: i as u64,
                 path: String::default(),
@@ -409,7 +408,6 @@ impl SchedulerTest {
         let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));
 
         let executors: HashMap<String, VirtualExecutor> = (0..num_executors)
-            .into_iter()
             .map(|i| {
                 let id = format!("virtual-executor-{i}");
                 let executor = VirtualExecutor {
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 0608f3e1..a72c92ec 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -981,10 +981,7 @@ impl BenchmarkRun {
                 .duration_since(SystemTime::UNIX_EPOCH)
                 .expect("current time is later than UNIX_EPOCH")
                 .as_secs(),
-            arguments: std::env::args()
-                .skip(1)
-                .into_iter()
-                .collect::<Vec<String>>(),
+            arguments: std::env::args().skip(1).collect::<Vec<String>>(),
             query,
             iterations: vec![],
         }

Reply via email to