This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3f46b0cd49 Fix extract parquet statistics from LargeBinary columns
(#10775)
3f46b0cd49 is described below
commit 3f46b0cd4990a6a94594291e79b7e2e9ade45df0
Author: Xin Li <[email protected]>
AuthorDate: Tue Jun 4 17:26:50 2024 +0800
Fix extract parquet statistics from LargeBinary columns (#10775)
* Fix Extract parquet statistics from LargeBinary columns
* fix
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../datasource/physical_plan/parquet/statistics.rs | 45 +++++++++++++++++++++-
datafusion/core/tests/parquet/arrow_statistics.rs | 28 ++++++++++++--
datafusion/core/tests/parquet/mod.rs | 39 +++++++++++++++++--
3 files changed, 102 insertions(+), 10 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 6c738cfe03..9d59808508 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -160,6 +160,9 @@ macro_rules! get_statistic {
Some(DataType::Binary) => {
Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec())))
}
+ Some(DataType::LargeBinary) => {
+
Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec())))
+ }
Some(DataType::LargeUtf8) | _ => {
let utf8_value = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
@@ -427,8 +430,9 @@ mod test {
use arrow_array::{
new_null_array, Array, BinaryArray, BooleanArray, Date32Array,
Date64Array,
Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array,
- Int8Array, RecordBatch, StringArray, StructArray,
TimestampMicrosecondArray,
- TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
+ Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
+ TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray,
+ TimestampSecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
@@ -965,6 +969,34 @@ mod test {
.run()
}
+ #[test]
+ fn roundtrip_large_binary_array() {
+ let input: Vec<Option<&[u8]>> = vec![
+ // row group 1
+ Some(b"A"),
+ None,
+ Some(b"Q"),
+ // row group 2
+ Some(b"ZZ"),
+ Some(b"AA"),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ];
+
+ let expected_min: Vec<Option<&[u8]>> = vec![Some(b"A"), Some(b"AA"),
None];
+ let expected_max: Vec<Option<&[u8]>> = vec![Some(b"Q"), Some(b"ZZ"),
None];
+
+ Test {
+ input: large_binary_array(input),
+ expected_min: large_binary_array(expected_min),
+ expected_max: large_binary_array(expected_max),
+ }
+ .run();
+ }
+
#[test]
fn struct_and_non_struct() {
// Ensures that statistics for an array that appears *after* a struct
@@ -1439,4 +1471,13 @@ mod test {
);
Arc::new(array)
}
+
+ fn large_binary_array<'a>(
+ input: impl IntoIterator<Item = Option<&'a [u8]>>,
+ ) -> ArrayRef {
+ let array =
+
LargeBinaryArray::from(input.into_iter().collect::<Vec<Option<&[u8]>>>());
+
+ Arc::new(array)
+ }
}
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index c2bf75c8f0..3b7961236e 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -30,8 +30,8 @@ use arrow::datatypes::{
use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array,
- Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
RecordBatch,
- StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
+ Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray,
+ RecordBatch, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
@@ -1261,7 +1261,6 @@ async fn test_decimal() {
}
.run();
}
-
#[tokio::test]
async fn test_dictionary() {
let reader = TestReader {
@@ -1302,11 +1301,12 @@ async fn test_dictionary() {
#[tokio::test]
async fn test_byte() {
- // This creates a parquet file of 4 columns
+ // This creates a parquet file of 5 columns
// "name"
// "service_string"
// "service_binary"
// "service_fixedsize"
+ // "service_large_binary"
// file has 3 record batches, each has 5 rows. They will be saved into 3
row groups
let reader = TestReader {
@@ -1389,6 +1389,26 @@ async fn test_byte() {
column_name: "service_fixedsize",
}
.run();
+
+ let expected_service_large_binary_min_values: Vec<&[u8]> =
+ vec![b"frontend five", b"backend one", b"backend eight"];
+
+ let expected_service_large_binary_max_values: Vec<&[u8]> =
+ vec![b"frontend two", b"frontend six", b"backend six"];
+
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(LargeBinaryArray::from(
+ expected_service_large_binary_min_values,
+ )),
+ expected_max: Arc::new(LargeBinaryArray::from(
+ expected_service_large_binary_max_values,
+ )),
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
+ expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "service_large_binary",
+ }
+ .run();
}
// PeriodsInColumnNames
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index e951644f2c..fd2f184328 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -21,10 +21,10 @@ use arrow::{
array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array,
DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array,
- Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
StringArray,
- StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
- TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array,
- UInt64Array, UInt8Array,
+ Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
+ LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
+ UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
datatypes::{DataType, Field, Int32Type, Int8Type, Schema},
record_batch::RecordBatch,
@@ -648,6 +648,8 @@ fn make_bytearray_batch(
string_values: Vec<&str>,
binary_values: Vec<&[u8]>,
fixedsize_values: Vec<&[u8; 3]>,
+ // i64 offset.
+ large_binary_values: Vec<&[u8]>,
) -> RecordBatch {
let num_rows = string_values.len();
let name: StringArray =
std::iter::repeat(Some(name)).take(num_rows).collect();
@@ -658,6 +660,8 @@ fn make_bytearray_batch(
.map(|value| Some(value.as_slice()))
.collect::<Vec<_>>()
.into();
+ let service_large_binary: LargeBinaryArray =
+ large_binary_values.iter().map(Some).collect();
let schema = Schema::new(vec![
Field::new("name", name.data_type().clone(), true),
@@ -669,6 +673,11 @@ fn make_bytearray_batch(
service_fixedsize.data_type().clone(),
true,
),
+ Field::new(
+ "service_large_binary",
+ service_large_binary.data_type().clone(),
+ true,
+ ),
]);
let schema = Arc::new(schema);
@@ -679,6 +688,7 @@ fn make_bytearray_batch(
Arc::new(service_string),
Arc::new(service_binary),
Arc::new(service_fixedsize),
+ Arc::new(service_large_binary),
],
)
.unwrap()
@@ -1000,6 +1010,13 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
b"frontend five",
],
vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"],
+ vec![
+ b"frontend one",
+ b"frontend two",
+ b"frontend three",
+ b"frontend seven",
+ b"frontend five",
+ ],
),
make_bytearray_batch(
"mixed",
@@ -1018,6 +1035,13 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
b"backend three",
],
vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"],
+ vec![
+ b"frontend six",
+ b"frontend four",
+ b"backend one",
+ b"backend two",
+ b"backend three",
+ ],
),
make_bytearray_batch(
"all backends",
@@ -1036,6 +1060,13 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
b"backend eight",
],
vec![b"be4", b"be5", b"be6", b"be7", b"be8"],
+ vec![
+ b"backend four",
+ b"backend five",
+ b"backend six",
+ b"backend seven",
+ b"backend eight",
+ ],
),
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]