This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a9df64  [Ballista] Fix scheduler state mod bug (#1655)
2a9df64 is described below

commit 2a9df6435c3b38243e1d0519feccdfc3fd0fde5f
Author: gaojun2048 <[email protected]>
AuthorDate: Tue Jan 25 04:08:44 2022 +0800

    [Ballista] Fix scheduler state mod bug (#1655)
    
    * Fix the bug of task stuck
    
    * Fix the bug of task stuck
---
 ballista/rust/client/src/context.rs      |  69 ++++++++++++++++++++
 ballista/rust/scheduler/src/state/mod.rs | 108 ++++++++++++++++++-------------
 2 files changed, 131 insertions(+), 46 deletions(-)

diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 7a11c68..3fb347b 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -353,6 +353,7 @@ impl BallistaContext {
 
 #[cfg(test)]
 mod tests {
+
     #[tokio::test]
     #[cfg(feature = "standalone")]
     async fn test_standalone_mode() {
@@ -452,4 +453,72 @@ mod tests {
         let df = context.sql("show tables;").await;
         assert!(df.is_ok());
     }
+
+    #[tokio::test]
+    #[cfg(feature = "standalone")]
+    async fn test_task_stuck_when_referenced_task_failed() {
+        use super::*;
+        use datafusion::arrow::datatypes::Schema;
+        use datafusion::arrow::util::pretty;
+        use datafusion::datasource::file_format::csv::CsvFormat;
+        use datafusion::datasource::file_format::parquet::ParquetFormat;
+        use datafusion::datasource::listing::{ListingOptions, ListingTable};
+
+        use ballista_core::config::{
+            BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
+        };
+        use std::fs::File;
+        use std::io::Write;
+        use tempfile::TempDir;
+        let config = BallistaConfigBuilder::default()
+            .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
+            .build()
+            .unwrap();
+        let context = BallistaContext::standalone(&config, 1).await.unwrap();
+
+        let testdata = datafusion::test_util::parquet_test_data();
+        context
+            .register_parquet("single_nan", &format!("{}/single_nan.parquet", 
testdata))
+            .await
+            .unwrap();
+
+        {
+            let mut guard = context.state.lock().unwrap();
+            let csv_table = guard.tables.get("single_nan");
+
+            if let Some(table_provide) = csv_table {
+                if let Some(listing_table) = table_provide
+                    .clone()
+                    .as_any()
+                    .downcast_ref::<ListingTable>()
+                {
+                    let x = listing_table.options();
+                    let error_options = ListingOptions {
+                        file_extension: x.file_extension.clone(),
+                        format: Arc::new(CsvFormat::default()),
+                        table_partition_cols: x.table_partition_cols.clone(),
+                        collect_stat: x.collect_stat,
+                        target_partitions: x.target_partitions,
+                    };
+                    let error_table = ListingTable::new(
+                        listing_table.object_store().clone(),
+                        listing_table.table_path().to_string(),
+                        Arc::new(Schema::new(vec![])),
+                        error_options,
+                    );
+                    // change the table to an error table
+                    guard
+                        .tables
+                        .insert("single_nan".to_string(), 
Arc::new(error_table));
+                }
+            }
+        }
+
+        let df = context
+            .sql("select count(1) from single_nan;")
+            .await
+            .unwrap();
+        let results = df.collect().await.unwrap();
+        pretty::print_batches(&results);
+    }
 }
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index fb45579..45d915a 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -429,34 +429,36 @@ impl SchedulerState {
                             .await?;
                         if task_is_dead {
                             continue 'tasks;
-                        } else if let Some(task_status::Status::Completed(
-                            CompletedTask {
+                        }
+                        match &referenced_task.status {
+                            Some(task_status::Status::Completed(CompletedTask {
                                 executor_id,
                                 partitions,
-                            },
-                        )) = &referenced_task.status
-                        {
-                            debug!("Task for unresolved shuffle input 
partition {} completed and produced these shuffle partitions:\n\t{}",
-                                shuffle_input_partition_id,
-                                partitions.iter().map(|p| format!("{}={}", 
p.partition_id, &p.path)).collect::<Vec<_>>().join("\n\t")
-                            );
-                            let stage_shuffle_partition_locations = 
partition_locations
-                                .entry(unresolved_shuffle.stage_id)
-                                .or_insert_with(HashMap::new);
-                            let executor_meta = executors
-                                .iter()
-                                .find(|exec| exec.id == *executor_id)
-                                .unwrap()
-                                .clone();
-
-                            for shuffle_write_partition in partitions {
-                                let temp = stage_shuffle_partition_locations
-                                    
.entry(shuffle_write_partition.partition_id as usize)
-                                    .or_insert_with(Vec::new);
-                                let executor_meta = executor_meta.clone();
-                                let partition_location =
-                                    
ballista_core::serde::scheduler::PartitionLocation {
-                                        partition_id:
+                            })) => {
+                                debug!("Task for unresolved shuffle input 
partition {} completed and produced these shuffle partitions:\n\t{}",
+                                    shuffle_input_partition_id,
+                                    partitions.iter().map(|p| format!("{}={}", 
p.partition_id, &p.path)).collect::<Vec<_>>().join("\n\t")
+                                );
+                                let stage_shuffle_partition_locations =
+                                    partition_locations
+                                        .entry(unresolved_shuffle.stage_id)
+                                        .or_insert_with(HashMap::new);
+                                let executor_meta = executors
+                                    .iter()
+                                    .find(|exec| exec.id == *executor_id)
+                                    .unwrap()
+                                    .clone();
+
+                                for shuffle_write_partition in partitions {
+                                    let temp = 
stage_shuffle_partition_locations
+                                        .entry(
+                                            
shuffle_write_partition.partition_id as usize,
+                                        )
+                                        .or_insert_with(Vec::new);
+                                    let executor_meta = executor_meta.clone();
+                                    let partition_location =
+                                        
ballista_core::serde::scheduler::PartitionLocation {
+                                            partition_id:
                                             
ballista_core::serde::scheduler::PartitionId {
                                                 job_id: 
partition.job_id.clone(),
                                                 stage_id: 
unresolved_shuffle.stage_id,
@@ -464,29 +466,43 @@ impl SchedulerState {
                                                     .partition_id
                                                     as usize,
                                             },
-                                        executor_meta,
-                                        partition_stats: PartitionStats::new(
-                                            
Some(shuffle_write_partition.num_rows),
-                                            
Some(shuffle_write_partition.num_batches),
-                                            
Some(shuffle_write_partition.num_bytes),
-                                        ),
-                                        path: 
shuffle_write_partition.path.clone(),
-                                    };
+                                            executor_meta,
+                                            partition_stats: 
PartitionStats::new(
+                                                
Some(shuffle_write_partition.num_rows),
+                                                
Some(shuffle_write_partition.num_batches),
+                                                
Some(shuffle_write_partition.num_bytes),
+                                            ),
+                                            path: 
shuffle_write_partition.path.clone(),
+                                        };
+                                    debug!(
+                                        "Scheduler storing stage {} output 
partition {} path: {}",
+                                        unresolved_shuffle.stage_id,
+                                        
partition_location.partition_id.partition_id,
+                                        partition_location.path
+                                    );
+                                    temp.push(partition_location);
+                                }
+                            }
+                            Some(task_status::Status::Failed(FailedTask { 
error })) => {
+                                // A task should fail when its referenced_task 
fails
+                                let mut status = status.clone();
+                                let err_msg = error.to_string();
+                                status.status =
+                                    
Some(task_status::Status::Failed(FailedTask {
+                                        error: err_msg,
+                                    }));
+                                self.save_task_status(&status).await?;
+                                continue 'tasks;
+                            }
+                            _ => {
                                 debug!(
-                                    "Scheduler storing stage {} output 
partition {} path: {}",
+                                    "Stage {} input partition {} has not 
completed yet",
                                     unresolved_shuffle.stage_id,
-                                    
partition_location.partition_id.partition_id,
-                                    partition_location.path
-                                    );
-                                temp.push(partition_location);
+                                    shuffle_input_partition_id,
+                                );
+                                continue 'tasks;
                             }
-                        } else {
-                            debug!(
-                                "Stage {} input partition {} has not completed 
yet",
-                                unresolved_shuffle.stage_id, 
shuffle_input_partition_id,
-                            );
-                            continue 'tasks;
-                        }
+                        };
                     }
                 }
 

Reply via email to