realno commented on a change in pull request #1783:
URL: https://github.com/apache/arrow-datafusion/pull/1783#discussion_r802312529



##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -148,3 +167,108 @@ async fn main() -> Result<()> {
 
     Ok(())
 }
+
+/// This function will scheduled periodically for cleanup executor.
+/// Will only clean the dir under work_dir not include file
+async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    let mut need_delete_dir;
+    while let Some(child) = dir.next_entry().await.unwrap() {
+        if let Ok(metadata) = child.metadata().await {
+            if metadata.is_dir() {
+                let dir = fs::read_dir(child.path()).await?;
+                match check_modified_time_in_dirs(vec![dir], seconds).await {
+                    Ok(x) => match x {
+                        true => {
+                            need_delete_dir = child.path().into_os_string();
+                            to_deleted.push(need_delete_dir)
+                        }
+                        false => {}
+                    },
+                    Err(e) => {
+                        error!("Fail in clean_shuffle_data_loop {:?}", e)
+                    }
+                }
+            }
+        } else {
+            error!("can not get meta from file{:?}", child)
+        }
+    }
+    info!(
+        "Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
+        &to_deleted, seconds
+    );
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
+/// Determines if a directory all files are older than cutoff seconds.
+async fn check_modified_time_in_dirs(

Review comment:
       I also prefer to keep the communication between scheduler and executor 
simple. Spark writes a file in the folder after stage succeeded. This handles 
some error cases but not all. It may worth to do something similar,  in the 
case you mentioned, if we have a small disk and need to clear some space to run 
other tasks, we can delete finished stages and clean up the test with TTL. 
Either way I don't feel we need to block this PR, we may create an issue to 
capture as future improvement. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to