This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f46b0cd49 Fix extract parquet statistics from LargeBinary columns 
(#10775)
3f46b0cd49 is described below

commit 3f46b0cd4990a6a94594291e79b7e2e9ade45df0
Author: Xin Li <[email protected]>
AuthorDate: Tue Jun 4 17:26:50 2024 +0800

    Fix extract parquet statistics from LargeBinary columns (#10775)
    
    * Fix Extract parquet statistics from LargeBinary columns
    
    * fix
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../datasource/physical_plan/parquet/statistics.rs | 45 +++++++++++++++++++++-
 datafusion/core/tests/parquet/arrow_statistics.rs  | 28 ++++++++++++--
 datafusion/core/tests/parquet/mod.rs               | 39 +++++++++++++++++--
 3 files changed, 102 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 6c738cfe03..9d59808508 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -160,6 +160,9 @@ macro_rules! get_statistic {
                     Some(DataType::Binary) => {
                         
Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec())))
                     }
+                    Some(DataType::LargeBinary) => {
+                        
Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec())))
+                    }
                     Some(DataType::LargeUtf8) | _ => {
                         let utf8_value = std::str::from_utf8(s.$bytes_func())
                             .map(|s| s.to_string())
@@ -427,8 +430,9 @@ mod test {
     use arrow_array::{
         new_null_array, Array, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
         Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
-        Int8Array, RecordBatch, StringArray, StructArray, 
TimestampMicrosecondArray,
-        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
+        Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
+        TimestampMicrosecondArray, TimestampMillisecondArray, 
TimestampNanosecondArray,
+        TimestampSecondArray,
     };
     use arrow_schema::{Field, SchemaRef};
     use bytes::Bytes;
@@ -965,6 +969,34 @@ mod test {
         .run()
     }
 
+    #[test]
+    fn roundtrip_large_binary_array() {
+        let input: Vec<Option<&[u8]>> = vec![
+            // row group 1
+            Some(b"A"),
+            None,
+            Some(b"Q"),
+            // row group 2
+            Some(b"ZZ"),
+            Some(b"AA"),
+            None,
+            // row group 3
+            None,
+            None,
+            None,
+        ];
+
+        let expected_min: Vec<Option<&[u8]>> = vec![Some(b"A"), Some(b"AA"), 
None];
+        let expected_max: Vec<Option<&[u8]>> = vec![Some(b"Q"), Some(b"ZZ"), 
None];
+
+        Test {
+            input: large_binary_array(input),
+            expected_min: large_binary_array(expected_min),
+            expected_max: large_binary_array(expected_max),
+        }
+        .run();
+    }
+
     #[test]
     fn struct_and_non_struct() {
         // Ensures that statistics for an array that appears *after* a struct
@@ -1439,4 +1471,13 @@ mod test {
         );
         Arc::new(array)
     }
+
+    fn large_binary_array<'a>(
+        input: impl IntoIterator<Item = Option<&'a [u8]>>,
+    ) -> ArrayRef {
+        let array =
+            
LargeBinaryArray::from(input.into_iter().collect::<Vec<Option<&[u8]>>>());
+
+        Arc::new(array)
+    }
 }
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs 
b/datafusion/core/tests/parquet/arrow_statistics.rs
index c2bf75c8f0..3b7961236e 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -30,8 +30,8 @@ use arrow::datatypes::{
 use arrow_array::{
     make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
     Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, 
Float64Array,
-    Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, 
RecordBatch,
-    StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
+    Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, 
LargeStringArray,
+    RecordBatch, StringArray, TimestampMicrosecondArray, 
TimestampMillisecondArray,
     TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
     UInt64Array, UInt8Array,
 };
@@ -1261,7 +1261,6 @@ async fn test_decimal() {
     }
     .run();
 }
-
 #[tokio::test]
 async fn test_dictionary() {
     let reader = TestReader {
@@ -1302,11 +1301,12 @@ async fn test_dictionary() {
 
 #[tokio::test]
 async fn test_byte() {
-    // This creates a parquet file of 4 columns
+    // This creates a parquet file of 5 columns
     // "name"
     // "service_string"
     // "service_binary"
     // "service_fixedsize"
+    // "service_large_binary"
 
     // file has 3 record batches, each has 5 rows. They will be saved into 3 
row groups
     let reader = TestReader {
@@ -1389,6 +1389,26 @@ async fn test_byte() {
         column_name: "service_fixedsize",
     }
     .run();
+
+    let expected_service_large_binary_min_values: Vec<&[u8]> =
+        vec![b"frontend five", b"backend one", b"backend eight"];
+
+    let expected_service_large_binary_max_values: Vec<&[u8]> =
+        vec![b"frontend two", b"frontend six", b"backend six"];
+
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(LargeBinaryArray::from(
+            expected_service_large_binary_min_values,
+        )),
+        expected_max: Arc::new(LargeBinaryArray::from(
+            expected_service_large_binary_max_values,
+        )),
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
+        expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+        column_name: "service_large_binary",
+    }
+    .run();
 }
 
 // PeriodsInColumnNames
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index e951644f2c..fd2f184328 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -21,10 +21,10 @@ use arrow::{
     array::{
         make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
         DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, 
Float64Array,
-        Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, 
StringArray,
-        StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
-        TimestampNanosecondArray, TimestampSecondArray, UInt16Array, 
UInt32Array,
-        UInt64Array, UInt8Array,
+        Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
+        LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray,
+        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
+        UInt16Array, UInt32Array, UInt64Array, UInt8Array,
     },
     datatypes::{DataType, Field, Int32Type, Int8Type, Schema},
     record_batch::RecordBatch,
@@ -648,6 +648,8 @@ fn make_bytearray_batch(
     string_values: Vec<&str>,
     binary_values: Vec<&[u8]>,
     fixedsize_values: Vec<&[u8; 3]>,
+    // i64 offset.
+    large_binary_values: Vec<&[u8]>,
 ) -> RecordBatch {
     let num_rows = string_values.len();
     let name: StringArray = 
std::iter::repeat(Some(name)).take(num_rows).collect();
@@ -658,6 +660,8 @@ fn make_bytearray_batch(
         .map(|value| Some(value.as_slice()))
         .collect::<Vec<_>>()
         .into();
+    let service_large_binary: LargeBinaryArray =
+        large_binary_values.iter().map(Some).collect();
 
     let schema = Schema::new(vec![
         Field::new("name", name.data_type().clone(), true),
@@ -669,6 +673,11 @@ fn make_bytearray_batch(
             service_fixedsize.data_type().clone(),
             true,
         ),
+        Field::new(
+            "service_large_binary",
+            service_large_binary.data_type().clone(),
+            true,
+        ),
     ]);
     let schema = Arc::new(schema);
 
@@ -679,6 +688,7 @@ fn make_bytearray_batch(
             Arc::new(service_string),
             Arc::new(service_binary),
             Arc::new(service_fixedsize),
+            Arc::new(service_large_binary),
         ],
     )
     .unwrap()
@@ -1000,6 +1010,13 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
                         b"frontend five",
                     ],
                     vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"],
+                    vec![
+                        b"frontend one",
+                        b"frontend two",
+                        b"frontend three",
+                        b"frontend seven",
+                        b"frontend five",
+                    ],
                 ),
                 make_bytearray_batch(
                     "mixed",
@@ -1018,6 +1035,13 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
                         b"backend three",
                     ],
                     vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"],
+                    vec![
+                        b"frontend six",
+                        b"frontend four",
+                        b"backend one",
+                        b"backend two",
+                        b"backend three",
+                    ],
                 ),
                 make_bytearray_batch(
                     "all backends",
@@ -1036,6 +1060,13 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
                         b"backend eight",
                     ],
                     vec![b"be4", b"be5", b"be6", b"be7", b"be8"],
+                    vec![
+                        b"backend four",
+                        b"backend five",
+                        b"backend six",
+                        b"backend seven",
+                        b"backend eight",
+                    ],
                 ),
             ]
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to