This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2a9df64 [Ballista] Fix scheduler state mod bug (#1655)
2a9df64 is described below
commit 2a9df6435c3b38243e1d0519feccdfc3fd0fde5f
Author: gaojun2048 <[email protected]>
AuthorDate: Tue Jan 25 04:08:44 2022 +0800
[Ballista] Fix scheduler state mod bug (#1655)
* Fix the bug of task stuck
* Fix the bug of task stuck
---
ballista/rust/client/src/context.rs | 69 ++++++++++++++++++++
ballista/rust/scheduler/src/state/mod.rs | 108 ++++++++++++++++++-------------
2 files changed, 131 insertions(+), 46 deletions(-)
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index 7a11c68..3fb347b 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -353,6 +353,7 @@ impl BallistaContext {
#[cfg(test)]
mod tests {
+
#[tokio::test]
#[cfg(feature = "standalone")]
async fn test_standalone_mode() {
@@ -452,4 +453,72 @@ mod tests {
let df = context.sql("show tables;").await;
assert!(df.is_ok());
}
+
+ #[tokio::test]
+ #[cfg(feature = "standalone")]
+ async fn test_task_stuck_when_referenced_task_failed() {
+ use super::*;
+ use datafusion::arrow::datatypes::Schema;
+ use datafusion::arrow::util::pretty;
+ use datafusion::datasource::file_format::csv::CsvFormat;
+ use datafusion::datasource::file_format::parquet::ParquetFormat;
+ use datafusion::datasource::listing::{ListingOptions, ListingTable};
+
+ use ballista_core::config::{
+ BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
+ };
+ use std::fs::File;
+ use std::io::Write;
+ use tempfile::TempDir;
+ let config = BallistaConfigBuilder::default()
+ .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
+ .build()
+ .unwrap();
+ let context = BallistaContext::standalone(&config, 1).await.unwrap();
+
+ let testdata = datafusion::test_util::parquet_test_data();
+ context
+ .register_parquet("single_nan", &format!("{}/single_nan.parquet",
testdata))
+ .await
+ .unwrap();
+
+ {
+ let mut guard = context.state.lock().unwrap();
+ let csv_table = guard.tables.get("single_nan");
+
+ if let Some(table_provide) = csv_table {
+ if let Some(listing_table) = table_provide
+ .clone()
+ .as_any()
+ .downcast_ref::<ListingTable>()
+ {
+ let x = listing_table.options();
+ let error_options = ListingOptions {
+ file_extension: x.file_extension.clone(),
+ format: Arc::new(CsvFormat::default()),
+ table_partition_cols: x.table_partition_cols.clone(),
+ collect_stat: x.collect_stat,
+ target_partitions: x.target_partitions,
+ };
+ let error_table = ListingTable::new(
+ listing_table.object_store().clone(),
+ listing_table.table_path().to_string(),
+ Arc::new(Schema::new(vec![])),
+ error_options,
+ );
+ // change the table to an error table
+ guard
+ .tables
+ .insert("single_nan".to_string(),
Arc::new(error_table));
+ }
+ }
+ }
+
+ let df = context
+ .sql("select count(1) from single_nan;")
+ .await
+ .unwrap();
+ let results = df.collect().await.unwrap();
+ pretty::print_batches(&results);
+ }
}
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index fb45579..45d915a 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -429,34 +429,36 @@ impl SchedulerState {
.await?;
if task_is_dead {
continue 'tasks;
- } else if let Some(task_status::Status::Completed(
- CompletedTask {
+ }
+ match &referenced_task.status {
+ Some(task_status::Status::Completed(CompletedTask {
executor_id,
partitions,
- },
- )) = &referenced_task.status
- {
- debug!("Task for unresolved shuffle input
partition {} completed and produced these shuffle partitions:\n\t{}",
- shuffle_input_partition_id,
- partitions.iter().map(|p| format!("{}={}",
p.partition_id, &p.path)).collect::<Vec<_>>().join("\n\t")
- );
- let stage_shuffle_partition_locations =
partition_locations
- .entry(unresolved_shuffle.stage_id)
- .or_insert_with(HashMap::new);
- let executor_meta = executors
- .iter()
- .find(|exec| exec.id == *executor_id)
- .unwrap()
- .clone();
-
- for shuffle_write_partition in partitions {
- let temp = stage_shuffle_partition_locations
-
.entry(shuffle_write_partition.partition_id as usize)
- .or_insert_with(Vec::new);
- let executor_meta = executor_meta.clone();
- let partition_location =
-
ballista_core::serde::scheduler::PartitionLocation {
- partition_id:
+ })) => {
+ debug!("Task for unresolved shuffle input
partition {} completed and produced these shuffle partitions:\n\t{}",
+ shuffle_input_partition_id,
+ partitions.iter().map(|p| format!("{}={}",
p.partition_id, &p.path)).collect::<Vec<_>>().join("\n\t")
+ );
+ let stage_shuffle_partition_locations =
+ partition_locations
+ .entry(unresolved_shuffle.stage_id)
+ .or_insert_with(HashMap::new);
+ let executor_meta = executors
+ .iter()
+ .find(|exec| exec.id == *executor_id)
+ .unwrap()
+ .clone();
+
+ for shuffle_write_partition in partitions {
+ let temp =
stage_shuffle_partition_locations
+ .entry(
+
shuffle_write_partition.partition_id as usize,
+ )
+ .or_insert_with(Vec::new);
+ let executor_meta = executor_meta.clone();
+ let partition_location =
+
ballista_core::serde::scheduler::PartitionLocation {
+ partition_id:
ballista_core::serde::scheduler::PartitionId {
job_id:
partition.job_id.clone(),
stage_id:
unresolved_shuffle.stage_id,
@@ -464,29 +466,43 @@ impl SchedulerState {
.partition_id
as usize,
},
- executor_meta,
- partition_stats: PartitionStats::new(
-
Some(shuffle_write_partition.num_rows),
-
Some(shuffle_write_partition.num_batches),
-
Some(shuffle_write_partition.num_bytes),
- ),
- path:
shuffle_write_partition.path.clone(),
- };
+ executor_meta,
+ partition_stats:
PartitionStats::new(
+
Some(shuffle_write_partition.num_rows),
+
Some(shuffle_write_partition.num_batches),
+
Some(shuffle_write_partition.num_bytes),
+ ),
+ path:
shuffle_write_partition.path.clone(),
+ };
+ debug!(
+ "Scheduler storing stage {} output
partition {} path: {}",
+ unresolved_shuffle.stage_id,
+
partition_location.partition_id.partition_id,
+ partition_location.path
+ );
+ temp.push(partition_location);
+ }
+ }
+ Some(task_status::Status::Failed(FailedTask {
error })) => {
+ // A task should fail when its referenced_task
fails
+ let mut status = status.clone();
+ let err_msg = error.to_string();
+ status.status =
+
Some(task_status::Status::Failed(FailedTask {
+ error: err_msg,
+ }));
+ self.save_task_status(&status).await?;
+ continue 'tasks;
+ }
+ _ => {
debug!(
- "Scheduler storing stage {} output
partition {} path: {}",
+ "Stage {} input partition {} has not
completed yet",
unresolved_shuffle.stage_id,
-
partition_location.partition_id.partition_id,
- partition_location.path
- );
- temp.push(partition_location);
+ shuffle_input_partition_id,
+ );
+ continue 'tasks;
}
- } else {
- debug!(
- "Stage {} input partition {} has not completed
yet",
- unresolved_shuffle.stage_id,
shuffle_input_partition_id,
- );
- continue 'tasks;
- }
+ };
}
}