This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ae5b23ec3 Stop ignoring errors when writing DataFrame to csv, parquet, 
json (#3801)
ae5b23ec3 is described below

commit ae5b23ec3c9b22e917c6e1b0a15e20396c82b8bd
Author: Andy Grove <[email protected]>
AuthorDate: Tue Oct 11 17:17:43 2022 -0600

    Stop ignoring errors when writing DataFrame to csv, parquet, json (#3801)
---
 .../core/src/physical_plan/file_format/csv.rs      | 24 +++++++++++++++++++++-
 .../core/src/physical_plan/file_format/json.rs     | 24 +++++++++++++++++++++-
 .../core/src/physical_plan/file_format/parquet.rs  | 24 +++++++++++++++++++++-
 datafusion/core/tests/csv/corrupt.csv              |  7 +++++++
 4 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 03e1e8059..d086a7798 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -270,7 +270,12 @@ pub async fn plan_to_csv(
                 });
                 tasks.push(handle);
             }
-            futures::future::join_all(tasks).await;
+            futures::future::join_all(tasks)
+                .await
+                .into_iter()
+                .try_for_each(|result| {
+                    result.map_err(|e| 
DataFusionError::Execution(format!("{}", e)))?
+                })?;
             Ok(())
         }
         Err(e) => Err(DataFusionError::Execution(format!(
@@ -599,6 +604,23 @@ mod tests {
         }
     }
 
+    #[tokio::test]
+    async fn write_csv_results_error_handling() -> Result<()> {
+        let ctx = SessionContext::new();
+        let options = CsvReadOptions::default()
+            .schema_infer_max_records(2)
+            .has_header(true);
+        let df = ctx.read_csv("tests/csv/corrupt.csv", options).await?;
+        let tmp_dir = TempDir::new()?;
+        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+        let e = df
+            .write_csv(&out_dir)
+            .await
+            .expect_err("should fail because input file does not match 
inferred schema");
+        assert_eq!("Arrow error: Parser error: Error while parsing value d for 
column 0 at line 4", format!("{}", e));
+        Ok(())
+    }
+
     #[tokio::test]
     async fn write_csv_results() -> Result<()> {
         // create partitioned input file and context
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 9475be156..d207988f4 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -233,7 +233,12 @@ pub async fn plan_to_json(
                 });
                 tasks.push(handle);
             }
-            futures::future::join_all(tasks).await;
+            futures::future::join_all(tasks)
+                .await
+                .into_iter()
+                .try_for_each(|result| {
+                    result.map_err(|e| 
DataFusionError::Execution(format!("{}", e)))?
+                })?;
             Ok(())
         }
         Err(e) => Err(DataFusionError::Execution(format!(
@@ -587,4 +592,21 @@ mod tests {
             );
         }
     }
+
+    #[tokio::test]
+    async fn write_json_results_error_handling() -> Result<()> {
+        let ctx = SessionContext::new();
+        let options = CsvReadOptions::default()
+            .schema_infer_max_records(2)
+            .has_header(true);
+        let df = ctx.read_csv("tests/csv/corrupt.csv", options).await?;
+        let tmp_dir = TempDir::new()?;
+        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+        let e = df
+            .write_json(&out_dir)
+            .await
+            .expect_err("should fail because input file does not match 
inferred schema");
+        assert_eq!("Arrow error: Parser error: Error while parsing value d for 
column 0 at line 4", format!("{}", e));
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 68c902a40..5f72c7acc 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -861,7 +861,12 @@ pub async fn plan_to_parquet(
                     });
                 tasks.push(handle);
             }
-            futures::future::join_all(tasks).await;
+            futures::future::join_all(tasks)
+                .await
+                .into_iter()
+                .try_for_each(|result| {
+                    result.map_err(|e| 
DataFusionError::Execution(format!("{}", e)))?
+                })?;
             Ok(())
         }
         Err(e) => Err(DataFusionError::Execution(format!(
@@ -979,6 +984,23 @@ mod tests {
         )
     }
 
+    #[tokio::test]
+    async fn write_parquet_results_error_handling() -> Result<()> {
+        let ctx = SessionContext::new();
+        let options = CsvReadOptions::default()
+            .schema_infer_max_records(2)
+            .has_header(true);
+        let df = ctx.read_csv("tests/csv/corrupt.csv", options).await?;
+        let tmp_dir = TempDir::new()?;
+        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+        let e = df
+            .write_parquet(&out_dir, None)
+            .await
+            .expect_err("should fail because input file does not match 
inferred schema");
+        assert_eq!("Parquet error: Arrow: underlying Arrow error: Parser 
error: Error while parsing value d for column 0 at line 4", format!("{}", e));
+        Ok(())
+    }
+
     #[tokio::test]
     async fn evolved_schema() {
         let c1: ArrayRef =
diff --git a/datafusion/core/tests/csv/corrupt.csv 
b/datafusion/core/tests/csv/corrupt.csv
new file mode 100644
index 000000000..58af6b975
--- /dev/null
+++ b/datafusion/core/tests/csv/corrupt.csv
@@ -0,0 +1,7 @@
+num,str
+1,a
+2,b
+3,c
+d,4
+4,d
+5,e
\ No newline at end of file

Reply via email to