This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ad8d552b9f parquet: Add support for row group pruning on
FixedSizeBinary (#9646)
ad8d552b9f is described below
commit ad8d552b9f150c3c066b0764e84f72b667a649ff
Author: Val Lorentz <[email protected]>
AuthorDate: Tue Mar 19 22:09:20 2024 +0100
parquet: Add support for row group pruning on FixedSizeBinary (#9646)
* Add support for row group pruning on FixedSizeBinary
* Check statistics values are valid for their type
---
.../datasource/physical_plan/parquet/row_groups.rs | 1 +
.../datasource/physical_plan/parquet/statistics.rs | 27 +++++-
datafusion/core/tests/parquet/row_group_pruning.rs | 101 +++++++++++++++++++++
3 files changed, 127 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 9cd4699496..a82c5d97a2 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -226,6 +226,7 @@ impl PruningStatistics for BloomFilterStatistics {
match value {
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
ScalarValue::Binary(Some(v)) => sbbf.check(v),
+ ScalarValue::FixedSizeBinary(_size, Some(v)) =>
sbbf.check(v),
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
ScalarValue::Float64(Some(v)) => sbbf.check(v),
ScalarValue::Float32(Some(v)) => sbbf.check(v),
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 4e472606da..aac5aff80f 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -105,14 +105,20 @@ macro_rules! get_statistic {
let s = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
.ok();
+ if s.is_none() {
+ log::debug!(
+ "Utf8 statistics is a non-UTF8 value, ignoring
it."
+ );
+ }
Some(ScalarValue::Utf8(s))
}
}
}
- // type not supported yet
+ // type not fully supported yet
ParquetStatistics::FixedLenByteArray(s) => {
match $target_arrow_type {
- // just support the decimal data type
+ // just support specific logical data types, there are
others each
+ // with their own ordering
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
@@ -120,6 +126,23 @@ macro_rules! get_statistic {
*scale,
))
}
+ Some(DataType::FixedSizeBinary(size)) => {
+ let value = s.$bytes_func().to_vec();
+ let value = if value.len().try_into() == Ok(*size) {
+ Some(value)
+ } else {
+ log::debug!(
+ "FixedSizeBinary({}) statistics is a binary of
size {}, ignoring it.",
+ size,
+ value.len(),
+ );
+ None
+ };
+ Some(ScalarValue::FixedSizeBinary(
+ *size,
+ value,
+ ))
+ }
_ => None,
}
}
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs
b/datafusion/core/tests/parquet/row_group_pruning.rs
index 5511219350..ed48d04064 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -948,6 +948,107 @@ async fn prune_binary_lt() {
.await;
}
+#[tokio::test]
+async fn prune_fixedsizebinary_eq_match() {
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::ByteArray)
+ .with_query(
+ "SELECT name, service_fixedsize FROM t WHERE service_fixedsize =
ARROW_CAST(CAST('fe6' AS bytea), 'FixedSizeBinary(3)')",
+ )
+ .with_expected_errors(Some(0))
+ // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7'
+ .with_matched_by_stats(Some(2))
+ .with_pruned_by_stats(Some(1))
+ .with_matched_by_bloom_filter(Some(1))
+ .with_pruned_by_bloom_filter(Some(1))
+ .with_expected_rows(1)
+ .test_row_group_prune()
+ .await;
+
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::ByteArray)
+ .with_query(
+ "SELECT name, service_fixedsize FROM t WHERE service_fixedsize =
ARROW_CAST(CAST('fe6' AS bytea), 'FixedSizeBinary(3)')",
+ )
+ .with_expected_errors(Some(0))
+ // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7'
+ .with_matched_by_stats(Some(2))
+ .with_pruned_by_stats(Some(1))
+ .with_matched_by_bloom_filter(Some(1))
+ .with_pruned_by_bloom_filter(Some(1))
+ .with_expected_rows(1)
+ .test_row_group_prune()
+ .await;
+}
+
+#[tokio::test]
+async fn prune_fixedsizebinary_eq_no_match() {
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::ByteArray)
+ .with_query(
+ "SELECT name, service_fixedsize FROM t WHERE service_fixedsize =
ARROW_CAST(CAST('be9' AS bytea), 'FixedSizeBinary(3)')",
+ )
+ .with_expected_errors(Some(0))
+ // false positive on 'mixed' batch: 'be1' < 'be9' < 'fe4'
+ .with_matched_by_stats(Some(1))
+ .with_pruned_by_stats(Some(2))
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(1))
+ .with_expected_rows(0)
+ .test_row_group_prune()
+ .await;
+}
+
+#[tokio::test]
+async fn prune_fixedsizebinary_neq() {
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::ByteArray)
+ .with_query(
+ "SELECT name, service_fixedsize FROM t WHERE service_fixedsize !=
ARROW_CAST(CAST('be1' AS bytea), 'FixedSizeBinary(3)')",
+ )
+ .with_expected_errors(Some(0))
+ .with_matched_by_stats(Some(3))
+ .with_pruned_by_stats(Some(0))
+ .with_matched_by_bloom_filter(Some(3))
+ .with_pruned_by_bloom_filter(Some(0))
+ .with_expected_rows(14)
+ .test_row_group_prune()
+ .await;
+}
+
+#[tokio::test]
+async fn prune_fixedsizebinary_lt() {
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::ByteArray)
+ .with_query(
+ "SELECT name, service_fixedsize FROM t WHERE service_fixedsize <
ARROW_CAST(CAST('be3' AS bytea), 'FixedSizeBinary(3)')",
+ )
+ .with_expected_errors(Some(0))
+ // matches 'all backends' only
+ .with_matched_by_stats(Some(1))
+ .with_pruned_by_stats(Some(2))
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(0))
+ .with_expected_rows(2)
+ .test_row_group_prune()
+ .await;
+
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::ByteArray)
+ .with_query(
+ "SELECT name, service_fixedsize FROM t WHERE service_fixedsize <
ARROW_CAST(CAST('be9' AS bytea), 'FixedSizeBinary(3)')",
+ )
+ .with_expected_errors(Some(0))
+ .with_matched_by_stats(Some(2))
+ .with_pruned_by_stats(Some(1))
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(0))
+ // all backends from 'mixed' and 'all backends'
+ .with_expected_rows(8)
+ .test_row_group_prune()
+ .await;
+}
+
#[tokio::test]
async fn prune_periods_in_column_names() {
// There are three row groups for "service.name", each with 5 rows = 15
rows total