This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ae5b23ec3 Stop ignoring errors when writing DataFrame to csv, parquet,
json (#3801)
ae5b23ec3 is described below
commit ae5b23ec3c9b22e917c6e1b0a15e20396c82b8bd
Author: Andy Grove <[email protected]>
AuthorDate: Tue Oct 11 17:17:43 2022 -0600
Stop ignoring errors when writing DataFrame to csv, parquet, json (#3801)
---
.../core/src/physical_plan/file_format/csv.rs | 24 +++++++++++++++++++++-
.../core/src/physical_plan/file_format/json.rs | 24 +++++++++++++++++++++-
.../core/src/physical_plan/file_format/parquet.rs | 24 +++++++++++++++++++++-
datafusion/core/tests/csv/corrupt.csv | 7 +++++++
4 files changed, 76 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 03e1e8059..d086a7798 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -270,7 +270,12 @@ pub async fn plan_to_csv(
});
tasks.push(handle);
}
- futures::future::join_all(tasks).await;
+ futures::future::join_all(tasks)
+ .await
+ .into_iter()
+ .try_for_each(|result| {
+ result.map_err(|e|
DataFusionError::Execution(format!("{}", e)))?
+ })?;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
@@ -599,6 +604,23 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn write_csv_results_error_handling() -> Result<()> {
+ let ctx = SessionContext::new();
+ let options = CsvReadOptions::default()
+ .schema_infer_max_records(2)
+ .has_header(true);
+ let df = ctx.read_csv("tests/csv/corrupt.csv", options).await?;
+ let tmp_dir = TempDir::new()?;
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+ let e = df
+ .write_csv(&out_dir)
+ .await
+ .expect_err("should fail because input file does not match
inferred schema");
+ assert_eq!("Arrow error: Parser error: Error while parsing value d for
column 0 at line 4", format!("{}", e));
+ Ok(())
+ }
+
#[tokio::test]
async fn write_csv_results() -> Result<()> {
// create partitioned input file and context
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 9475be156..d207988f4 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -233,7 +233,12 @@ pub async fn plan_to_json(
});
tasks.push(handle);
}
- futures::future::join_all(tasks).await;
+ futures::future::join_all(tasks)
+ .await
+ .into_iter()
+ .try_for_each(|result| {
+ result.map_err(|e|
DataFusionError::Execution(format!("{}", e)))?
+ })?;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
@@ -587,4 +592,21 @@ mod tests {
);
}
}
+
+ #[tokio::test]
+ async fn write_json_results_error_handling() -> Result<()> {
+ let ctx = SessionContext::new();
+ let options = CsvReadOptions::default()
+ .schema_infer_max_records(2)
+ .has_header(true);
+ let df = ctx.read_csv("tests/csv/corrupt.csv", options).await?;
+ let tmp_dir = TempDir::new()?;
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+ let e = df
+ .write_json(&out_dir)
+ .await
+ .expect_err("should fail because input file does not match
inferred schema");
+ assert_eq!("Arrow error: Parser error: Error while parsing value d for
column 0 at line 4", format!("{}", e));
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 68c902a40..5f72c7acc 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -861,7 +861,12 @@ pub async fn plan_to_parquet(
});
tasks.push(handle);
}
- futures::future::join_all(tasks).await;
+ futures::future::join_all(tasks)
+ .await
+ .into_iter()
+ .try_for_each(|result| {
+ result.map_err(|e|
DataFusionError::Execution(format!("{}", e)))?
+ })?;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
@@ -979,6 +984,23 @@ mod tests {
)
}
+ #[tokio::test]
+ async fn write_parquet_results_error_handling() -> Result<()> {
+ let ctx = SessionContext::new();
+ let options = CsvReadOptions::default()
+ .schema_infer_max_records(2)
+ .has_header(true);
+ let df = ctx.read_csv("tests/csv/corrupt.csv", options).await?;
+ let tmp_dir = TempDir::new()?;
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+ let e = df
+ .write_parquet(&out_dir, None)
+ .await
+ .expect_err("should fail because input file does not match
inferred schema");
+ assert_eq!("Parquet error: Arrow: underlying Arrow error: Parser
error: Error while parsing value d for column 0 at line 4", format!("{}", e));
+ Ok(())
+ }
+
#[tokio::test]
async fn evolved_schema() {
let c1: ArrayRef =
diff --git a/datafusion/core/tests/csv/corrupt.csv
b/datafusion/core/tests/csv/corrupt.csv
new file mode 100644
index 000000000..58af6b975
--- /dev/null
+++ b/datafusion/core/tests/csv/corrupt.csv
@@ -0,0 +1,7 @@
+num,str
+1,a
+2,b
+3,c
+d,4
+4,d
+5,e
\ No newline at end of file