This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fcd37ee Return error from JSON writer rather than panic (#1205)
fcd37ee is described below
commit fcd37ee1f8a8c3f5af099419aa9667bf0a515595
Author: Yang <[email protected]>
AuthorDate: Thu Jan 20 05:28:52 2022 +0800
Return error from JSON writer rather than panic (#1205)
* Return error from JSON writer rather than panic
* fix comment
---
arrow/src/json/writer.rs | 144 ++++++++++++++++++++++++-----------------------
1 file changed, 73 insertions(+), 71 deletions(-)
diff --git a/arrow/src/json/writer.rs b/arrow/src/json/writer.rs
index 787ed15..0a96392 100644
--- a/arrow/src/json/writer.rs
+++ b/arrow/src/json/writer.rs
@@ -38,7 +38,7 @@
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(a)]).unwrap();
//!
-//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! let json_rows =
json::writer::record_batches_to_json_rows(&[batch]).unwrap();
//! assert_eq!(
//! serde_json::Value::Object(json_rows[1].clone()),
//! serde_json::json!({"a": 2}),
@@ -110,64 +110,61 @@ use serde_json::Value;
use crate::array::*;
use crate::datatypes::*;
-use crate::error::Result;
+use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
-fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) ->
Vec<Value> {
- as_primitive_array::<T>(array)
+fn primitive_array_to_json<T: ArrowPrimitiveType>(
+ array: &ArrayRef,
+) -> Result<Vec<Value>> {
+ Ok(as_primitive_array::<T>(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into_json_value().unwrap_or(Value::Null),
None => Value::Null,
})
- .collect()
+ .collect())
}
fn struct_array_to_jsonmap_array(
array: &StructArray,
row_count: usize,
-) -> Vec<JsonMap<String, Value>> {
+) -> Result<Vec<JsonMap<String, Value>>> {
let inner_col_names = array.column_names();
let mut inner_objs = iter::repeat(JsonMap::new())
.take(row_count)
.collect::<Vec<JsonMap<String, Value>>>();
- array
- .columns()
- .iter()
- .enumerate()
- .for_each(|(j, struct_col)| {
- set_column_for_json_rows(
- &mut inner_objs,
- row_count,
- struct_col,
- inner_col_names[j],
- );
- });
-
- inner_objs
+ for (j, struct_col) in array.columns().iter().enumerate() {
+ set_column_for_json_rows(
+ &mut inner_objs,
+ row_count,
+ struct_col,
+ inner_col_names[j],
+ )?
+ }
+ Ok(inner_objs)
}
/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON
[`serde_json::Value`]'s
-pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+pub fn array_to_json_array(array: &ArrayRef) -> Result<Vec<Value>> {
match array.data_type() {
- DataType::Null =>
iter::repeat(Value::Null).take(array.len()).collect(),
- DataType::Boolean => as_boolean_array(array)
+ DataType::Null =>
Ok(iter::repeat(Value::Null).take(array.len()).collect()),
+ DataType::Boolean => Ok(as_boolean_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
- .collect(),
+ .collect()),
- DataType::Utf8 => as_string_array(array)
+ DataType::Utf8 => Ok(as_string_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
- .collect(),
+ .collect()),
DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
@@ -181,28 +178,26 @@ pub fn array_to_json_array(array: &ArrayRef) ->
Vec<Value> {
DataType::List(_) => as_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
- Some(v) => Value::Array(array_to_json_array(&v)),
- None => Value::Null,
+ Some(v) => Ok(Value::Array(array_to_json_array(&v)?)),
+ None => Ok(Value::Null),
})
.collect(),
DataType::LargeList(_) => as_large_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
- Some(v) => Value::Array(array_to_json_array(&v)),
- None => Value::Null,
+ Some(v) => Ok(Value::Array(array_to_json_array(&v)?)),
+ None => Ok(Value::Null),
})
.collect(),
DataType::Struct(_) => {
let jsonmaps =
- struct_array_to_jsonmap_array(as_struct_array(array),
array.len());
- jsonmaps.into_iter().map(Value::Object).collect()
- }
- _ => {
- panic!(
- "Unsupported datatype for array conversion: {:#?}",
- array.data_type()
- );
+ struct_array_to_jsonmap_array(as_struct_array(array),
array.len())?;
+ Ok(jsonmaps.into_iter().map(Value::Object).collect())
}
+ t => Err(ArrowError::JsonError(format!(
+ "data type {:?} not supported",
+ t
+ ))),
}
}
@@ -261,37 +256,37 @@ fn set_column_for_json_rows(
row_count: usize,
array: &ArrayRef,
col_name: &str,
-) {
+) -> Result<()> {
match array.data_type() {
DataType::Int8 => {
- set_column_by_primitive_type::<Int8Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<Int8Type>(rows, row_count, array,
col_name);
}
DataType::Int16 => {
- set_column_by_primitive_type::<Int16Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<Int16Type>(rows, row_count, array,
col_name);
}
DataType::Int32 => {
- set_column_by_primitive_type::<Int32Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<Int32Type>(rows, row_count, array,
col_name);
}
DataType::Int64 => {
- set_column_by_primitive_type::<Int64Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<Int64Type>(rows, row_count, array,
col_name);
}
DataType::UInt8 => {
- set_column_by_primitive_type::<UInt8Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<UInt8Type>(rows, row_count, array,
col_name);
}
DataType::UInt16 => {
- set_column_by_primitive_type::<UInt16Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<UInt16Type>(rows, row_count, array,
col_name);
}
DataType::UInt32 => {
- set_column_by_primitive_type::<UInt32Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<UInt32Type>(rows, row_count, array,
col_name);
}
DataType::UInt64 => {
- set_column_by_primitive_type::<UInt64Type>(rows, row_count, array,
col_name)
+ set_column_by_primitive_type::<UInt64Type>(rows, row_count, array,
col_name);
}
DataType::Float32 => {
- set_column_by_primitive_type::<Float32Type>(rows, row_count,
array, col_name)
+ set_column_by_primitive_type::<Float32Type>(rows, row_count,
array, col_name);
}
DataType::Float64 => {
- set_column_by_primitive_type::<Float64Type>(rows, row_count,
array, col_name)
+ set_column_by_primitive_type::<Float64Type>(rows, row_count,
array, col_name);
}
DataType::Null => {
// when value is null, we simply skip setting the key
@@ -444,7 +439,7 @@ fn set_column_for_json_rows(
}
DataType::Struct(_) => {
let inner_objs =
- struct_array_to_jsonmap_array(as_struct_array(array),
row_count);
+ struct_array_to_jsonmap_array(as_struct_array(array),
row_count)?;
rows.iter_mut()
.take(row_count)
.zip(inner_objs.into_iter())
@@ -457,34 +452,34 @@ fn set_column_for_json_rows(
rows.iter_mut()
.zip(listarr.iter())
.take(row_count)
- .for_each(|(row, maybe_value)| {
+ .try_for_each(|(row, maybe_value)| -> Result<()> {
if let Some(v) = maybe_value {
row.insert(
col_name.to_string(),
- Value::Array(array_to_json_array(&v)),
+ Value::Array(array_to_json_array(&v)?),
);
}
- });
+ Ok(())
+ })?;
}
DataType::LargeList(_) => {
let listarr = as_large_list_array(array);
rows.iter_mut()
.zip(listarr.iter())
.take(row_count)
- .for_each(|(row, maybe_value)| {
+ .try_for_each(|(row, maybe_value)| -> Result<()> {
if let Some(v) = maybe_value {
- row.insert(
- col_name.to_string(),
- Value::Array(array_to_json_array(&v)),
- );
+ let val = array_to_json_array(&v)?;
+ row.insert(col_name.to_string(), Value::Array(val));
}
- });
+ Ok(())
+ })?;
}
DataType::Dictionary(_, value_type) => {
let slice = array.slice(0, row_count);
let hydrated = crate::compute::kernels::cast::cast(&slice,
value_type)
.expect("cannot cast dictionary to underlying values");
- set_column_for_json_rows(rows, row_count, &hydrated, col_name)
+ set_column_for_json_rows(rows, row_count, &hydrated, col_name)?;
}
DataType::Map(_, _) => {
let maparr = as_map_array(array);
@@ -494,11 +489,14 @@ fn set_column_for_json_rows(
// Keys have to be strings to convert to json.
if !matches!(keys.data_type(), DataType::Utf8) {
- panic!("Unsupported datatype: {:#?}", array.data_type());
+ return Err(ArrowError::JsonError(format!(
+ "data type {:?} not supported in nested map for json
writer",
+ keys.data_type()
+ )));
}
let keys = as_string_array(&keys);
- let values = array_to_json_array(&values);
+ let values = array_to_json_array(&values)?;
let mut kv = keys.iter().zip(values.into_iter());
@@ -522,16 +520,20 @@ fn set_column_for_json_rows(
}
}
_ => {
- panic!("Unsupported datatype: {:#?}", array.data_type());
+ return Err(ArrowError::JsonError(format!(
+ "data type {:?} not supported in nested map for json writer",
+ array.data_type()
+ )))
}
}
+ Ok(())
}
/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`JsonMap`]s (objects)
pub fn record_batches_to_json_rows(
batches: &[RecordBatch],
-) -> Vec<JsonMap<String, Value>> {
+) -> Result<Vec<JsonMap<String, Value>>> {
let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
.take(batches.iter().map(|b| b.num_rows()).sum())
.collect();
@@ -539,17 +541,17 @@ pub fn record_batches_to_json_rows(
if !rows.is_empty() {
let schema = batches[0].schema();
let mut base = 0;
- batches.iter().for_each(|batch| {
+ for batch in batches {
let row_count = batch.num_rows();
- batch.columns().iter().enumerate().for_each(|(j, col)| {
+ for (j, col) in batch.columns().iter().enumerate() {
let col_name = schema.field(j).name();
- set_column_for_json_rows(&mut rows[base..], row_count, col,
col_name);
- });
+ set_column_for_json_rows(&mut rows[base..], row_count, col,
col_name)?
+ }
base += row_count;
- });
+ }
}
- rows
+ Ok(rows)
}
/// This trait defines how to format a sequence of JSON objects to a
@@ -683,7 +685,7 @@ where
/// Convert the [`RecordBatch`] into JSON rows, and write them to the
output
pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
- for row in record_batches_to_json_rows(batches) {
+ for row in record_batches_to_json_rows(batches)? {
self.write_row(&Value::Object(row))?;
}
Ok(())