This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 9206bdb2 Fix for error message during testing (#707)
9206bdb2 is described below
commit 9206bdb2cd8bfd6a01cafdb5a09a37767b2b9488
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 15 12:45:53 2023 +0800
Fix for error message during testing (#707)
* Fix cargo clippy
* Fix for error message during testing
* Remove unwrap for dealing with JobQueued event
* log task ids when launch tasks
---------
Co-authored-by: yangzhong <[email protected]>
---
.../core/src/execution_plans/shuffle_reader.rs | 1 -
ballista/scheduler/src/cluster/event/mod.rs | 6 ++---
ballista/scheduler/src/cluster/memory.rs | 1 -
.../src/scheduler_server/query_stage_scheduler.rs | 8 +++---
.../src/state/execution_graph/execution_stage.rs | 30 ++++++++++++++--------
ballista/scheduler/src/state/mod.rs | 1 -
ballista/scheduler/src/state/task_manager.rs | 18 ++++++++++++-
ballista/scheduler/src/test_utils.rs | 2 --
benchmarks/src/bin/tpch.rs | 5 +---
9 files changed, 43 insertions(+), 29 deletions(-)
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 20c31c50..3bab9266 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -637,7 +637,6 @@ mod tests {
fn get_test_partition_locations(n: usize, path: String) ->
Vec<PartitionLocation> {
(0..n)
- .into_iter()
.map(|partition_id| PartitionLocation {
map_partition_id: 0,
partition_id: PartitionId {
diff --git a/ballista/scheduler/src/cluster/event/mod.rs
b/ballista/scheduler/src/cluster/event/mod.rs
index 88b3c297..b4b70836 100644
--- a/ballista/scheduler/src/cluster/event/mod.rs
+++ b/ballista/scheduler/src/cluster/event/mod.rs
@@ -194,7 +194,7 @@ mod test {
}
});
- let expected: Vec<i32> = (0..100).into_iter().collect();
+ let expected: Vec<i32> = (0..100).collect();
let results = handle.await.unwrap();
assert_eq!(results.len(), 3);
@@ -233,7 +233,7 @@ mod test {
// When we reach capacity older events should be dropped so we only see
// the last 8 events in our subscribers
- let expected: Vec<i32> = (92..100).into_iter().collect();
+ let expected: Vec<i32> = (92..100).collect();
let results = handle.await.unwrap();
assert_eq!(results.len(), 3);
@@ -271,7 +271,7 @@ mod test {
}
});
- let expected: Vec<i32> = (1..=100).into_iter().collect();
+ let expected: Vec<i32> = (1..=100).collect();
let results = handle.await.unwrap();
assert_eq!(results.len(), 3);
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index 1a852ea1..95c6b7a2 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -187,7 +187,6 @@ impl ClusterState for InMemoryClusterState {
let slots = std::mem::take(&mut spec.available_task_slots) as
usize;
let reservations = (0..slots)
- .into_iter()
.map(|_| ExecutorReservation::new_free(metadata.id.clone()))
.collect();
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 3b035470..c4296fb8 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -131,11 +131,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
resubmit: false,
}
};
- tx_event
- .post_event(event)
- .await
- .map_err(|e| error!("Fail to send event due to {}", e))
- .unwrap();
+ if let Err(e) = tx_event.post_event(event).await {
+ error!("Fail to send event due to {}", e);
+ }
});
}
QueryStageSchedulerEvent::JobSubmitted {
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index f1181286..542d5116 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -786,7 +786,12 @@ impl RunningStage {
partition: usize,
metrics: Vec<OperatorMetricsSet>,
) -> Result<()> {
- if let Some(combined_metrics) = &mut self.stage_metrics {
+ // For some cases, task metrics not set, especially for testings.
+ if metrics.is_empty() {
+ return Ok(());
+ }
+
+ let new_metrics_set = if let Some(combined_metrics) = &mut
self.stage_metrics {
if metrics.len() != combined_metrics.len() {
return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
@@ -801,23 +806,21 @@ impl RunningStage {
})
.collect::<Result<Vec<_>>>()?;
- let new_metrics_set = combined_metrics
+ combined_metrics
.iter_mut()
.zip(metrics_values_array)
.map(|(first, second)| {
Self::combine_metrics_set(first, second, partition)
})
- .collect();
- self.stage_metrics = Some(new_metrics_set)
+ .collect()
} else {
- let new_metrics_set = metrics
+ metrics
.into_iter()
.map(|ms| ms.try_into())
- .collect::<Result<Vec<_>>>()?;
- if !new_metrics_set.is_empty() {
- self.stage_metrics = Some(new_metrics_set)
- }
- }
+ .collect::<Result<Vec<_>>>()?
+ };
+ self.stage_metrics = Some(new_metrics_set);
+
Ok(())
}
@@ -935,6 +938,11 @@ impl SuccessfulStage {
_ => task_infos.push(None),
}
}
+ let stage_metrics = if self.stage_metrics.is_empty() {
+ None
+ } else {
+ Some(self.stage_metrics.clone())
+ };
RunningStage {
stage_id: self.stage_id,
stage_attempt_num: self.stage_attempt_num + 1,
@@ -946,7 +954,7 @@ impl SuccessfulStage {
task_infos,
// It is Ok to forget the previous task failure attempts
task_failure_numbers: vec![0; self.partitions],
- stage_metrics: Some(self.stage_metrics.clone()),
+ stage_metrics,
}
}
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index d38415bd..57ea0be6 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -170,7 +170,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
let total_num_tasks = tasks_status.len();
let reservations = (0..total_num_tasks)
- .into_iter()
.map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
.collect();
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index 00532962..0d3293f7 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -85,7 +85,23 @@ impl TaskLauncher for DefaultTaskLauncher {
tasks: Vec<MultiTaskDefinition>,
executor_manager: &ExecutorManager,
) -> Result<()> {
- info!("Launching multi task on executor {:?}", executor.id);
+ if log::max_level() >= log::Level::Info {
+ let tasks_ids: Vec<String> = tasks
+ .iter()
+ .map(|task| {
+ let task_ids: Vec<u32> = task
+ .task_ids
+ .iter()
+ .map(|task_id| task_id.partition_id)
+ .collect();
+ format!("{}/{}/{:?}", task.job_id, task.stage_id, task_ids)
+ })
+ .collect();
+ info!(
+ "Launching multi task on executor {:?} for {:?}",
+ executor.id, tasks_ids
+ );
+ }
let mut client = executor_manager.get_client(&executor.id).await?;
client
.launch_multi_task(protobuf::LaunchMultiTaskParams {
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index e0971dc6..ee35958d 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -278,7 +278,6 @@ pub fn default_task_runner() -> impl TaskRunner {
};
let partitions: Vec<ShuffleWritePartition> = (0..partitions)
- .into_iter()
.map(|i| ShuffleWritePartition {
partition_id: i as u64,
path: String::default(),
@@ -409,7 +408,6 @@ impl SchedulerTest {
let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));
let executors: HashMap<String, VirtualExecutor> = (0..num_executors)
- .into_iter()
.map(|i| {
let id = format!("virtual-executor-{i}");
let executor = VirtualExecutor {
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 0608f3e1..a72c92ec 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -981,10 +981,7 @@ impl BenchmarkRun {
.duration_since(SystemTime::UNIX_EPOCH)
.expect("current time is later than UNIX_EPOCH")
.as_secs(),
- arguments: std::env::args()
- .skip(1)
- .into_iter()
- .collect::<Vec<String>>(),
+ arguments: std::env::args().skip(1).collect::<Vec<String>>(),
query,
iterations: vec![],
}