This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fcaa812  Improve avro reader performance by avoiding some cloning on 
avro_rs::Value (#1206)
fcaa812 is described below

commit fcaa812b89ea7b48df3a7a9ac3e09a4f3da2b735
Author: Guillaume Balaine <[email protected]>
AuthorDate: Thu Nov 4 15:54:47 2021 +0100

    Improve avro reader performance by avoiding some cloning on avro_rs::Value 
(#1206)
    
    * improve avro reader performance by avoid some cloning on avro_rs::Value
    
    * avro reader: slight iterator improvement in next_batch
---
 datafusion/src/avro_to_arrow/arrow_array_reader.rs | 92 ++++++++++++----------
 1 file changed, 51 insertions(+), 41 deletions(-)

diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs 
b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
index bb863d6..9d55529 100644
--- a/datafusion/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
@@ -50,7 +50,7 @@ use std::collections::HashMap;
 use std::io::Read;
 use std::sync::Arc;
 
-type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+type RecordSlice<'a> = &'a [&'a Vec<(String, Value)>];
 
 pub struct AvroArrowArrayReader<'a, R: Read> {
     reader: AvroReader<'a, R>,
@@ -91,30 +91,32 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     /// Read the next batch of records
     #[allow(clippy::should_implement_trait)]
     pub fn next_batch(&mut self, batch_size: usize) -> 
ArrowResult<Option<RecordBatch>> {
-        let mut rows = Vec::with_capacity(batch_size);
-        for value in self.reader.by_ref().take(batch_size) {
-            let v = value.map_err(|e| {
-                ArrowError::ParseError(format!("Failed to parse avro value: 
{:?}", e))
-            })?;
-            match v {
-                Value::Record(v) => {
-                    rows.push(v);
-                }
+        let rows = self
+            .reader
+            .by_ref()
+            .take(batch_size)
+            .map(|value| match value {
+                Ok(Value::Record(v)) => Ok(v),
+                Err(e) => Err(ArrowError::ParseError(format!(
+                    "Failed to parse avro value: {:?}",
+                    e
+                ))),
                 other => {
                     return Err(ArrowError::ParseError(format!(
                         "Row needs to be of type object, got: {:?}",
                         other
                     )))
                 }
-            }
-        }
+            })
+            .collect::<ArrowResult<Vec<Vec<(String, Value)>>>>()?;
         if rows.is_empty() {
             // reached end of file
             return Ok(None);
         }
-        let rows = &rows[..];
+        let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
         let projection = self.projection.clone().unwrap_or_else(Vec::new);
-        let arrays = self.build_struct_array(rows, self.schema.fields(), 
&projection);
+        let arrays =
+            self.build_struct_array(rows.as_slice(), self.schema.fields(), 
&projection);
         let projected_fields: Vec<Field> = if projection.is_empty() {
             self.schema.fields().to_vec()
         } else {
@@ -287,7 +289,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 let vals: Vec<Option<String>> = if let Value::String(v) = 
value {
                     vec![Some(v.to_string())]
                 } else if let Value::Array(n) = value {
-                    n.into_iter()
+                    n.iter()
                         .map(|v| resolve_string(&v))
                         .collect::<ArrowResult<Vec<String>>>()?
                         .into_iter()
@@ -424,7 +426,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     /// Build a nested GenericListArray from a list of unnested `Value`s
     fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
         &self,
-        rows: &[Value],
+        rows: &[&Value],
         list_field: &Field,
     ) -> ArrowResult<ArrayRef> {
         // build list offsets
@@ -561,6 +563,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                         }
                     })
                     .collect();
+                let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
                 let arrays =
                     self.build_struct_array(rows.as_slice(), 
fields.as_slice(), &[])?;
                 let data_type = DataType::Struct(fields.clone());
@@ -735,9 +738,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                                     .iter()
                                     .map(|row| {
                                         self.field_lookup(field.name(), row)
-                                            .unwrap_or(Value::Null)
+                                            .unwrap_or(&Value::Null)
                                     })
-                                    .collect::<Vec<Value>>();
+                                    .collect::<Vec<&Value>>();
                                 self.build_nested_list_array::<i32>(
                                     extracted_rows.as_slice(),
                                     list_field,
@@ -760,15 +763,15 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                             .iter()
                             .enumerate()
                             .map(|(i, row)| (i, 
self.field_lookup(field.name(), row)))
-                            .map(|(i, v)| match v {
-                                // we want the field as an object, if it's 
not, we treat as null
-                                Some(Value::Record(ref value)) => {
+                            .map(|(i, v)| {
+                                if let Some(Value::Record(value)) = v {
                                     
bit_util::set_bit(null_buffer.as_slice_mut(), i);
-                                    value.clone()
+                                    value
+                                } else {
+                                    panic!("expected struct got {:?}", v);
                                 }
-                                _ => vec![],
                             })
-                            .collect::<Vec<Vec<(String, Value)>>>();
+                            .collect::<Vec<&Vec<(String, Value)>>>();
                         let arrays =
                             self.build_struct_array(struct_rows.as_slice(), 
fields, &[])?;
                         // construct a struct array's data in order to set 
null buffer
@@ -794,7 +797,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     }
 
     /// Read the primitive list's values into ArrayData
-    fn read_primitive_list_values<T>(&self, rows: &[Value]) -> ArrayData
+    fn read_primitive_list_values<T>(&self, rows: &[&Value]) -> ArrayData
     where
         T: ArrowPrimitiveType + ArrowNumericType,
         T::Native: num_traits::cast::NumCast,
@@ -819,11 +822,15 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
         array.data().clone()
     }
 
-    fn field_lookup(&self, name: &str, row: &[(String, Value)]) -> 
Option<Value> {
+    fn field_lookup<'b>(
+        &self,
+        name: &str,
+        row: &'b [(String, Value)],
+    ) -> Option<&'b Value> {
         self.schema_lookup
             .get(name)
             .and_then(|i| row.get(*i))
-            .map(|o| o.1.clone())
+            .map(|o| &o.1)
     }
 }
 
@@ -831,17 +838,16 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
 /// single-value lists.
 /// This is used to read into nested lists (list of list, list of struct) and 
non-dictionary lists.
 #[inline]
-fn flatten_values(values: &[Value]) -> Vec<Value> {
+fn flatten_values<'a>(values: &[&'a Value]) -> Vec<&'a Value> {
     values
         .iter()
         .flat_map(|row| {
-            if let Value::Array(values) = row {
-                values.clone()
-            } else if let Value::Null = row {
-                vec![Value::Null]
+            let v = maybe_resolve_union(row);
+            if let Value::Array(values) = v {
+                values.iter().collect()
             } else {
                 // we interpret a scalar as a single-value list to minimise 
data loss
-                vec![row.clone()]
+                vec![v]
             }
         })
         .collect()
@@ -851,7 +857,7 @@ fn flatten_values(values: &[Value]) -> Vec<Value> {
 /// This is useful for interpreting any Avro array as string, dropping nulls.
 /// See `value_as_string`.
 #[inline]
-fn flatten_string_values(values: &[Value]) -> Vec<Option<String>> {
+fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
     values
         .iter()
         .flat_map(|row| {
@@ -884,8 +890,12 @@ fn resolve_string(v: &Value) -> ArrowResult<String> {
     .map_err(|e| SchemaError(format!("expected resolvable string : {}", e)))
 }
 
-fn resolve_u8(v: Value) -> AvroResult<u8> {
-    let int = v.resolve(&AvroSchema::Int)?;
+fn resolve_u8(v: &Value) -> AvroResult<u8> {
+    let int = match v {
+        Value::Int(n) => Ok(Value::Int(*n)),
+        Value::Long(n) => Ok(Value::Int(*n as i32)),
+        other => Err(AvroError::GetU8(other.into())),
+    }?;
     if let Value::Int(n) = int {
         if n >= 0 && n <= std::convert::From::from(u8::MAX) {
             return Ok(n as u8);
@@ -895,14 +905,14 @@ fn resolve_u8(v: Value) -> AvroResult<u8> {
     Err(AvroError::GetU8(int.into()))
 }
 
-fn resolve_bytes(v: Value) -> Option<Vec<u8>> {
-    let v = if let Value::Union(b) = v { *b } else { v };
+fn resolve_bytes(v: &Value) -> Option<Vec<u8>> {
+    let v = if let Value::Union(b) = v { b } else { v };
     match v {
-        Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
-        Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
+        Value::Bytes(_) => Ok(v.clone()),
+        Value::String(s) => Ok(Value::Bytes(s.clone().into_bytes())),
         Value::Array(items) => Ok(Value::Bytes(
             items
-                .into_iter()
+                .iter()
                 .map(resolve_u8)
                 .collect::<std::result::Result<Vec<_>, _>>()
                 .ok()?,

Reply via email to