This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 13e15fece46d51b81d47e3d205f19fb5b6d57097
Author: Neville Dipale <[email protected]>
AuthorDate: Sat Oct 17 21:04:39 2020 +0200

    ARROW-10334: [Rust] [Parquet] NullArray roundtrip
    
    This allows writing an Arrow NullArray to Parquet.
    Support was added a few years ago in Parquet, and the C++ implementation 
supports writing null arrays.
    The array is stored as an int32 which has all values set as null.
    In order to implement this, we introduce a `null -> int32` cast, which 
creates a null int32 of same length.
    Semantically, the write is the same as writing an int32 that's all null, 
but we create a null writer to preserve the data type.
    
    Closes #8484 from nevi-me/ARROW-10334
    
    Authored-by: Neville Dipale <[email protected]>
    Signed-off-by: Neville Dipale <[email protected]>
---
 rust/arrow/src/array/null.rs           |   8 +-
 rust/arrow/src/compute/kernels/cast.rs |  81 ++++++++++++-------
 rust/parquet/src/arrow/array_reader.rs | 142 ++++++++++++++++++++++++++++-----
 rust/parquet/src/arrow/arrow_writer.rs |  32 +++++---
 rust/parquet/src/arrow/schema.rs       |   6 +-
 5 files changed, 205 insertions(+), 64 deletions(-)

diff --git a/rust/arrow/src/array/null.rs b/rust/arrow/src/array/null.rs
index 190d2fa..08c7cf1 100644
--- a/rust/arrow/src/array/null.rs
+++ b/rust/arrow/src/array/null.rs
@@ -113,7 +113,7 @@ impl From<ArrayDataRef> for NullArray {
 
 impl fmt::Debug for NullArray {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "NullArray")
+        write!(f, "NullArray({})", self.len())
     }
 }
 
@@ -146,4 +146,10 @@ mod tests {
         assert_eq!(array2.null_count(), 16);
         assert_eq!(array2.offset(), 8);
     }
+
+    #[test]
+    fn test_debug_null_array() {
+        let array = NullArray::new(1024 * 1024);
+        assert_eq!(format!("{:?}", array), "NullArray(1048576)");
+    }
 }
diff --git a/rust/arrow/src/compute/kernels/cast.rs 
b/rust/arrow/src/compute/kernels/cast.rs
index 4e1bc85..ab34c6a 100644
--- a/rust/arrow/src/compute/kernels/cast.rs
+++ b/rust/arrow/src/compute/kernels/cast.rs
@@ -200,8 +200,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: 
&DataType) -> bool {
         (Timestamp(_, _), Date32(_)) => true,
         (Timestamp(_, _), Date64(_)) => true,
         // date64 to timestamp might not make sense,
-
-        // end temporal casts
+        (Null, Int32) => true,
         (_, _) => false,
     }
 }
@@ -729,25 +728,31 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> 
Result<ArrayRef> {
             // single integer operation, but need to avoid integer
             // math rounding down to zero
 
-            if to_size > from_size {
-                let time_array = Date64Array::from(array.data());
-                Ok(Arc::new(multiply(
-                    &time_array,
-                    &Date64Array::from(vec![to_size / from_size; array.len()]),
-                )?) as ArrayRef)
-            } else if to_size < from_size {
-                let time_array = Date64Array::from(array.data());
-                Ok(Arc::new(divide(
-                    &time_array,
-                    &Date64Array::from(vec![from_size / to_size; array.len()]),
-                )?) as ArrayRef)
-            } else {
-                cast_array_data::<Date64Type>(array, to_type.clone())
+            match to_size.cmp(&from_size) {
+                std::cmp::Ordering::Less => {
+                    let time_array = Date64Array::from(array.data());
+                    Ok(Arc::new(divide(
+                        &time_array,
+                        &Date64Array::from(vec![from_size / to_size; 
array.len()]),
+                    )?) as ArrayRef)
+                }
+                std::cmp::Ordering::Equal => {
+                    cast_array_data::<Date64Type>(array, to_type.clone())
+                }
+                std::cmp::Ordering::Greater => {
+                    let time_array = Date64Array::from(array.data());
+                    Ok(Arc::new(multiply(
+                        &time_array,
+                        &Date64Array::from(vec![to_size / from_size; 
array.len()]),
+                    )?) as ArrayRef)
+                }
             }
         }
         // date64 to timestamp might not make sense,
 
-        // end temporal casts
+        // null to primitive/flat types
+        (Null, Int32) => Ok(Arc::new(Int32Array::from(vec![None; 
array.len()]))),
+
         (_, _) => Err(ArrowError::ComputeError(format!(
             "Casting from {:?} to {:?} not supported",
             from_type, to_type,
@@ -2476,44 +2481,44 @@ mod tests {
 
         // Test casting TO StringArray
         let cast_type = Utf8;
-        let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 
succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 
failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         // Test casting TO Dictionary (with different index sizes)
 
         let cast_type = Dictionary(Box::new(Int16), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         let cast_type = Dictionary(Box::new(Int32), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         let cast_type = Dictionary(Box::new(Int64), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         let cast_type = Dictionary(Box::new(UInt16), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         let cast_type = Dictionary(Box::new(UInt32), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         let cast_type = Dictionary(Box::new(UInt64), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
     }
@@ -2598,11 +2603,11 @@ mod tests {
         let expected = vec!["1", "null", "3"];
 
         // Test casting TO PrimitiveArray, different dictionary type
-        let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 succeeded");
+        let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 failed");
         assert_eq!(array_to_strings(&cast_array), expected);
         assert_eq!(cast_array.data_type(), &Utf8);
 
-        let cast_array = cast(&array, &Int64).expect("cast to int64 
succeeded");
+        let cast_array = cast(&array, &Int64).expect("cast to int64 failed");
         assert_eq!(array_to_strings(&cast_array), expected);
         assert_eq!(cast_array.data_type(), &Int64);
     }
@@ -2621,13 +2626,13 @@ mod tests {
 
         // Cast to a dictionary (same value type, Int32)
         let cast_type = Dictionary(Box::new(UInt8), Box::new(Int32));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
 
         // Cast to a dictionary (different value type, Int8)
         let cast_type = Dictionary(Box::new(UInt8), Box::new(Int8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
     }
@@ -2646,11 +2651,25 @@ mod tests {
 
         // Cast to a dictionary (same value type, Utf8)
         let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8));
-        let cast_array = cast(&array, &cast_type).expect("cast succeeded");
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
         assert_eq!(cast_array.data_type(), &cast_type);
         assert_eq!(array_to_strings(&cast_array), expected);
     }
 
+    #[test]
+    fn test_cast_null_array_to_int32() {
+        let array = Arc::new(NullArray::new(6)) as ArrayRef;
+
+        let expected = Int32Array::from(vec![None; 6]);
+
+        // Cast to a dictionary (same value type, Utf8)
+        let cast_type = DataType::Int32;
+        let cast_array = cast(&array, &cast_type).expect("cast failed");
+        let cast_array = as_primitive_array::<Int32Type>(&cast_array);
+        assert_eq!(cast_array.data_type(), &cast_type);
+        assert_eq!(cast_array, &expected);
+    }
+
     /// Print the `DictionaryArray` `array` as a vector of strings
     fn array_to_strings(array: &ArrayRef) -> Vec<String> {
         (0..array.len())
@@ -2768,7 +2787,7 @@ mod tests {
             )),
             Arc::new(TimestampNanosecondArray::from_vec(
                 vec![1000, 2000],
-                Some(tz_name.clone()),
+                Some(tz_name),
             )),
             Arc::new(Date32Array::from(vec![1000, 2000])),
             Arc::new(Date64Array::from(vec![1000, 2000])),
diff --git a/rust/parquet/src/arrow/array_reader.rs 
b/rust/parquet/src/arrow/array_reader.rs
index 49e652d..bee608c 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -82,6 +82,97 @@ pub trait ArrayReader {
     fn get_rep_levels(&self) -> Option<&[i16]>;
 }
 
+/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
+/// NullArray type.
+pub struct NullArrayReader<T: DataType> {
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    column_desc: ColumnDescPtr,
+    record_reader: RecordReader<T>,
+    _type_marker: PhantomData<T>,
+}
+
+impl<T: DataType> NullArrayReader<T> {
+    /// Construct null array reader.
+    pub fn new(
+        mut pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+    ) -> Result<Self> {
+        let mut record_reader = RecordReader::<T>::new(column_desc.clone());
+        if let Some(page_reader) = pages.next() {
+            record_reader.set_page_reader(page_reader?)?;
+        }
+
+        Ok(Self {
+            data_type: ArrowType::Null,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            column_desc,
+            record_reader,
+            _type_marker: PhantomData,
+        })
+    }
+}
+
+/// Implementation of primitive array reader.
+impl<T: DataType> ArrayReader for NullArrayReader<T> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type of primitive array.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    /// Reads at most `batch_size` records into array.
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        let mut records_read = 0usize;
+        while records_read < batch_size {
+            let records_to_read = batch_size - records_read;
+
+            // NB can be 0 if at end of page
+            let records_read_once = 
self.record_reader.read_records(records_to_read)?;
+            records_read += records_read_once;
+
+            // Record reader exhausted
+            if records_read_once < records_to_read {
+                if let Some(page_reader) = self.pages.next() {
+                    // Read from new page reader
+                    self.record_reader.set_page_reader(page_reader?)?;
+                } else {
+                    // Page reader also exhausted
+                    break;
+                }
+            }
+        }
+
+        // convert to arrays
+        let array = arrow::array::NullArray::new(records_read);
+
+        // save definition and repetition buffers
+        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.record_reader.reset();
+        Ok(Arc::new(array))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+}
+
 /// Primitive array readers are leaves of array reader tree. They accept page 
iterator
 /// and read them into primitive arrays.
 pub struct PrimitiveArrayReader<T: DataType> {
@@ -859,10 +950,19 @@ impl<'a> ArrayReaderBuilder {
                 page_iterator,
                 column_desc,
             )?)),
-            PhysicalType::INT32 => 
Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
-                page_iterator,
-                column_desc,
-            )?)),
+            PhysicalType::INT32 => {
+                if let Some(ArrowType::Null) = arrow_type {
+                    Ok(Box::new(NullArrayReader::<Int32Type>::new(
+                        page_iterator,
+                        column_desc,
+                    )?))
+                } else {
+                    Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
+                        page_iterator,
+                        column_desc,
+                    )?))
+                }
+            }
             PhysicalType::INT64 => 
Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
                 page_iterator,
                 column_desc,
@@ -903,25 +1003,23 @@ impl<'a> ArrayReaderBuilder {
                             page_iterator, column_desc, converter
                         )?))
                     }
+                } else if let Some(ArrowType::LargeBinary) = arrow_type {
+                    let converter =
+                        LargeBinaryConverter::new(LargeBinaryArrayConverter 
{});
+                    Ok(Box::new(ComplexObjectArrayReader::<
+                        ByteArrayType,
+                        LargeBinaryConverter,
+                    >::new(
+                        page_iterator, column_desc, converter
+                    )?))
                 } else {
-                    if let Some(ArrowType::LargeBinary) = arrow_type {
-                        let converter =
-                            
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            LargeBinaryConverter,
-                        >::new(
-                            page_iterator, column_desc, converter
-                        )?))
-                    } else {
-                        let converter = 
BinaryConverter::new(BinaryArrayConverter {});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            BinaryConverter,
-                        >::new(
-                            page_iterator, column_desc, converter
-                        )?))
-                    }
+                    let converter = BinaryConverter::new(BinaryArrayConverter 
{});
+                    Ok(Box::new(ComplexObjectArrayReader::<
+                        ByteArrayType,
+                        BinaryConverter,
+                    >::new(
+                        page_iterator, column_desc, converter
+                    )?))
                 }
             }
             PhysicalType::FIXED_LEN_BYTE_ARRAY => {
diff --git a/rust/parquet/src/arrow/arrow_writer.rs 
b/rust/parquet/src/arrow/arrow_writer.rs
index a17e424..ff535dc 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -128,7 +128,8 @@ fn write_leaves(
     mut levels: &mut Vec<Levels>,
 ) -> Result<()> {
     match array.data_type() {
-        ArrowDataType::Int8
+        ArrowDataType::Null
+        | ArrowDataType::Int8
         | ArrowDataType::Int16
         | ArrowDataType::Int32
         | ArrowDataType::Int64
@@ -179,7 +180,6 @@ fn write_leaves(
             "Float16 arrays not supported".to_string(),
         )),
         ArrowDataType::FixedSizeList(_, _)
-        | ArrowDataType::Null
         | ArrowDataType::Boolean
         | ArrowDataType::FixedSizeBinary(_)
         | ArrowDataType::Union(_)
@@ -279,7 +279,10 @@ fn get_levels(
     parent_rep_levels: Option<&[i16]>,
 ) -> Vec<Levels> {
     match array.data_type() {
-        ArrowDataType::Null => unimplemented!(),
+        ArrowDataType::Null => vec![Levels {
+            definition: parent_def_levels.iter().map(|v| (v - 
1).max(0)).collect(),
+            repetition: None,
+        }],
         ArrowDataType::Boolean
         | ArrowDataType::Int8
         | ArrowDataType::Int16
@@ -356,7 +359,11 @@ fn get_levels(
 
             // if datatype is a primitive, we can construct levels of the 
child array
             match child_array.data_type() {
-                ArrowDataType::Null => unimplemented!(),
+                // TODO: The behaviour of a <list<null>> is untested
+                ArrowDataType::Null => vec![Levels {
+                    definition: list_def_levels,
+                    repetition: Some(list_rep_levels),
+                }],
                 ArrowDataType::Boolean => unimplemented!(),
                 ArrowDataType::Int8
                 | ArrowDataType::Int16
@@ -701,7 +708,7 @@ mod tests {
             expected_batch.schema(),
             None,
         )
-        .unwrap();
+        .expect("Unable to write file");
         writer.write(&expected_batch).unwrap();
         writer.close().unwrap();
 
@@ -709,7 +716,10 @@ mod tests {
         let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader));
         let mut record_batch_reader = 
arrow_reader.get_record_reader(1024).unwrap();
 
-        let actual_batch = record_batch_reader.next().unwrap().unwrap();
+        let actual_batch = record_batch_reader
+            .next()
+            .expect("No batch found")
+            .expect("Unable to get batch");
 
         assert_eq!(expected_batch.schema(), actual_batch.schema());
         assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
@@ -778,11 +788,15 @@ mod tests {
     }
 
     #[test]
-    #[should_panic(expected = "Null arrays not supported")]
+    fn all_null_primitive_single_column() {
+        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
+        one_column_roundtrip("all_null_primitive_single_column", values, true);
+    }
+    #[test]
     fn null_single_column() {
         let values = Arc::new(NullArray::new(SMALL_SIZE));
-        one_column_roundtrip("null_single_column", values.clone(), true);
-        one_column_roundtrip("null_single_column", values, false);
+        one_column_roundtrip("null_single_column", values, true);
+        // null arrays are always nullable, a test with non-nullable nulls 
fails
     }
 
     #[test]
diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index 0cd41fe..10270ff 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -308,7 +308,10 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
     };
     // create type from field
     match field.data_type() {
-        DataType::Null => Err(ArrowError("Null arrays not 
supported".to_string())),
+        DataType::Null => Type::primitive_type_builder(name, 
PhysicalType::INT32)
+            .with_logical_type(LogicalType::NONE)
+            .with_repetition(repetition)
+            .build(),
         DataType::Boolean => Type::primitive_type_builder(name, 
PhysicalType::BOOLEAN)
             .with_repetition(repetition)
             .build(),
@@ -1501,6 +1504,7 @@ mod tests {
                 //     )))),
                 //     true,
                 // ),
+                Field::new("c35", DataType::Null, true),
             ],
             metadata,
         );

Reply via email to