This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fcaa812 Improve avro reader performance by avoiding some cloning on
avro_rs::Value (#1206)
fcaa812 is described below
commit fcaa812b89ea7b48df3a7a9ac3e09a4f3da2b735
Author: Guillaume Balaine <[email protected]>
AuthorDate: Thu Nov 4 15:54:47 2021 +0100
Improve avro reader performance by avoiding some cloning on avro_rs::Value
(#1206)
* improve avro reader performance by avoid some cloning on avro_rs::Value
* avro reader: slight iterator improvement in next_batch
---
datafusion/src/avro_to_arrow/arrow_array_reader.rs | 92 ++++++++++++----------
1 file changed, 51 insertions(+), 41 deletions(-)
diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs
b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
index bb863d6..9d55529 100644
--- a/datafusion/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
@@ -50,7 +50,7 @@ use std::collections::HashMap;
use std::io::Read;
use std::sync::Arc;
-type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+type RecordSlice<'a> = &'a [&'a Vec<(String, Value)>];
pub struct AvroArrowArrayReader<'a, R: Read> {
reader: AvroReader<'a, R>,
@@ -91,30 +91,32 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
/// Read the next batch of records
#[allow(clippy::should_implement_trait)]
pub fn next_batch(&mut self, batch_size: usize) ->
ArrowResult<Option<RecordBatch>> {
- let mut rows = Vec::with_capacity(batch_size);
- for value in self.reader.by_ref().take(batch_size) {
- let v = value.map_err(|e| {
- ArrowError::ParseError(format!("Failed to parse avro value:
{:?}", e))
- })?;
- match v {
- Value::Record(v) => {
- rows.push(v);
- }
+ let rows = self
+ .reader
+ .by_ref()
+ .take(batch_size)
+ .map(|value| match value {
+ Ok(Value::Record(v)) => Ok(v),
+ Err(e) => Err(ArrowError::ParseError(format!(
+ "Failed to parse avro value: {:?}",
+ e
+ ))),
other => {
return Err(ArrowError::ParseError(format!(
"Row needs to be of type object, got: {:?}",
other
)))
}
- }
- }
+ })
+ .collect::<ArrowResult<Vec<Vec<(String, Value)>>>>()?;
if rows.is_empty() {
// reached end of file
return Ok(None);
}
- let rows = &rows[..];
+ let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
let projection = self.projection.clone().unwrap_or_else(Vec::new);
- let arrays = self.build_struct_array(rows, self.schema.fields(),
&projection);
+ let arrays =
+ self.build_struct_array(rows.as_slice(), self.schema.fields(),
&projection);
let projected_fields: Vec<Field> = if projection.is_empty() {
self.schema.fields().to_vec()
} else {
@@ -287,7 +289,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let vals: Vec<Option<String>> = if let Value::String(v) =
value {
vec![Some(v.to_string())]
} else if let Value::Array(n) = value {
- n.into_iter()
+ n.iter()
.map(|v| resolve_string(&v))
.collect::<ArrowResult<Vec<String>>>()?
.into_iter()
@@ -424,7 +426,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
/// Build a nested GenericListArray from a list of unnested `Value`s
fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
&self,
- rows: &[Value],
+ rows: &[&Value],
list_field: &Field,
) -> ArrowResult<ArrayRef> {
// build list offsets
@@ -561,6 +563,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}
})
.collect();
+ let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
let arrays =
self.build_struct_array(rows.as_slice(),
fields.as_slice(), &[])?;
let data_type = DataType::Struct(fields.clone());
@@ -735,9 +738,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.iter()
.map(|row| {
self.field_lookup(field.name(), row)
- .unwrap_or(Value::Null)
+ .unwrap_or(&Value::Null)
})
- .collect::<Vec<Value>>();
+ .collect::<Vec<&Value>>();
self.build_nested_list_array::<i32>(
extracted_rows.as_slice(),
list_field,
@@ -760,15 +763,15 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.iter()
.enumerate()
.map(|(i, row)| (i,
self.field_lookup(field.name(), row)))
- .map(|(i, v)| match v {
- // we want the field as an object, if it's
not, we treat as null
- Some(Value::Record(ref value)) => {
+ .map(|(i, v)| {
+ if let Some(Value::Record(value)) = v {
bit_util::set_bit(null_buffer.as_slice_mut(), i);
- value.clone()
+ value
+ } else {
+ panic!("expected struct got {:?}", v);
}
- _ => vec![],
})
- .collect::<Vec<Vec<(String, Value)>>>();
+ .collect::<Vec<&Vec<(String, Value)>>>();
let arrays =
self.build_struct_array(struct_rows.as_slice(),
fields, &[])?;
// construct a struct array's data in order to set
null buffer
@@ -794,7 +797,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}
/// Read the primitive list's values into ArrayData
- fn read_primitive_list_values<T>(&self, rows: &[Value]) -> ArrayData
+ fn read_primitive_list_values<T>(&self, rows: &[&Value]) -> ArrayData
where
T: ArrowPrimitiveType + ArrowNumericType,
T::Native: num_traits::cast::NumCast,
@@ -819,11 +822,15 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
array.data().clone()
}
- fn field_lookup(&self, name: &str, row: &[(String, Value)]) ->
Option<Value> {
+ fn field_lookup<'b>(
+ &self,
+ name: &str,
+ row: &'b [(String, Value)],
+ ) -> Option<&'b Value> {
self.schema_lookup
.get(name)
.and_then(|i| row.get(*i))
- .map(|o| o.1.clone())
+ .map(|o| &o.1)
}
}
@@ -831,17 +838,16 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
/// single-value lists.
/// This is used to read into nested lists (list of list, list of struct) and
non-dictionary lists.
#[inline]
-fn flatten_values(values: &[Value]) -> Vec<Value> {
+fn flatten_values<'a>(values: &[&'a Value]) -> Vec<&'a Value> {
values
.iter()
.flat_map(|row| {
- if let Value::Array(values) = row {
- values.clone()
- } else if let Value::Null = row {
- vec![Value::Null]
+ let v = maybe_resolve_union(row);
+ if let Value::Array(values) = v {
+ values.iter().collect()
} else {
// we interpret a scalar as a single-value list to minimise
data loss
- vec![row.clone()]
+ vec![v]
}
})
.collect()
@@ -851,7 +857,7 @@ fn flatten_values(values: &[Value]) -> Vec<Value> {
/// This is useful for interpreting any Avro array as string, dropping nulls.
/// See `value_as_string`.
#[inline]
-fn flatten_string_values(values: &[Value]) -> Vec<Option<String>> {
+fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
values
.iter()
.flat_map(|row| {
@@ -884,8 +890,12 @@ fn resolve_string(v: &Value) -> ArrowResult<String> {
.map_err(|e| SchemaError(format!("expected resolvable string : {}", e)))
}
-fn resolve_u8(v: Value) -> AvroResult<u8> {
- let int = v.resolve(&AvroSchema::Int)?;
+fn resolve_u8(v: &Value) -> AvroResult<u8> {
+ let int = match v {
+ Value::Int(n) => Ok(Value::Int(*n)),
+ Value::Long(n) => Ok(Value::Int(*n as i32)),
+ other => Err(AvroError::GetU8(other.into())),
+ }?;
if let Value::Int(n) = int {
if n >= 0 && n <= std::convert::From::from(u8::MAX) {
return Ok(n as u8);
@@ -895,14 +905,14 @@ fn resolve_u8(v: Value) -> AvroResult<u8> {
Err(AvroError::GetU8(int.into()))
}
-fn resolve_bytes(v: Value) -> Option<Vec<u8>> {
- let v = if let Value::Union(b) = v { *b } else { v };
+fn resolve_bytes(v: &Value) -> Option<Vec<u8>> {
+ let v = if let Value::Union(b) = v { b } else { v };
match v {
- Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
- Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
+ Value::Bytes(_) => Ok(v.clone()),
+ Value::String(s) => Ok(Value::Bytes(s.clone().into_bytes())),
Value::Array(items) => Ok(Value::Bytes(
items
- .into_iter()
+ .iter()
.map(resolve_u8)
.collect::<std::result::Result<Vec<_>, _>>()
.ok()?,