This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a2c9d1a8ba Minor: Return option from row_group_row_count (#10973)
a2c9d1a8ba is described below
commit a2c9d1a8ba4445dec2a37df5a3fdd018158b91a6
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Tue Jun 18 16:36:00 2024 +0200
Minor: Return option from row_group_row_count (#10973)
* refactor: return Option from row_group_row_count
* fix: doctest
---
datafusion-examples/examples/parquet_index.rs | 6 +++++-
datafusion/core/benches/parquet_statistic.rs | 3 +--
.../datasource/physical_plan/parquet/row_groups.rs | 6 ++++--
.../datasource/physical_plan/parquet/statistics.rs | 20 +++++++++++++----
datafusion/core/tests/parquet/arrow_statistics.rs | 25 ++++------------------
5 files changed, 30 insertions(+), 30 deletions(-)
diff --git a/datafusion-examples/examples/parquet_index.rs
b/datafusion-examples/examples/parquet_index.rs
index e3387117c9..668eda0474 100644
--- a/datafusion-examples/examples/parquet_index.rs
+++ b/datafusion-examples/examples/parquet_index.rs
@@ -526,7 +526,11 @@ impl ParquetMetadataIndexBuilder {
reader.schema(),
reader.parquet_schema(),
)?;
- let row_counts =
StatisticsConverter::row_group_row_counts(row_groups.iter())?;
+ let row_counts = converter
+ .row_group_row_counts(row_groups.iter())?
+ .ok_or_else(|| {
+ internal_datafusion_err!("Row group row counts are missing")
+ })?;
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;
diff --git a/datafusion/core/benches/parquet_statistic.rs
b/datafusion/core/benches/parquet_statistic.rs
index 5fd6b0066e..b58ecc13ae 100644
--- a/datafusion/core/benches/parquet_statistic.rs
+++ b/datafusion/core/benches/parquet_statistic.rs
@@ -175,8 +175,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let _ =
converter.row_group_mins(row_groups.iter()).unwrap();
let _ =
converter.row_group_maxes(row_groups.iter()).unwrap();
let _ =
converter.row_group_null_counts(row_groups.iter()).unwrap();
- let _ =
StatisticsConverter::row_group_row_counts(row_groups.iter())
- .unwrap();
+ let _ =
converter.row_group_row_counts(row_groups.iter()).unwrap();
})
},
);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index f8e4889f0b..e590f37225 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -384,10 +384,12 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
.map(|counts| Arc::new(counts) as ArrayRef)
}
- fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
+ fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// row counts are the same for all columns in a row group
- StatisticsConverter::row_group_row_counts(self.metadata_iter())
+ self.statistics_converter(column)
+ .and_then(|c| c.row_group_row_counts(self.metadata_iter()))
.ok()
+ .flatten()
.map(|counts| Arc::new(counts) as ArrayRef)
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 14d7bc2af4..6ad78a82b9 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -718,21 +718,33 @@ impl<'a> StatisticsConverter<'a> {
///
/// # Example
/// ```no_run
+ /// # use arrow::datatypes::Schema;
+ /// # use arrow_array::ArrayRef;
/// # use parquet::file::metadata::ParquetMetaData;
/// # use
datafusion::datasource::physical_plan::parquet::StatisticsConverter;
/// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
- /// // Given the metadata for a parquet file
+ /// # fn get_arrow_schema() -> Schema { unimplemented!() }
+ /// // Given the metadata for a parquet file and the arrow schema
/// let metadata: ParquetMetaData = get_parquet_metadata();
+ /// let arrow_schema: Schema = get_arrow_schema();
+ /// let parquet_schema = metadata.file_metadata().schema_descr();
+ /// // create a converter
+ /// let converter = StatisticsConverter::try_new("foo", &arrow_schema,
parquet_schema)
+ /// .unwrap();
/// // get the row counts for each row group
- /// let row_counts = StatisticsConverter::row_group_row_counts(metadata
+ /// let row_counts = converter.row_group_row_counts(metadata
/// .row_groups()
/// .iter()
/// );
/// ```
- pub fn row_group_row_counts<I>(metadatas: I) -> Result<UInt64Array>
+ pub fn row_group_row_counts<I>(&self, metadatas: I) ->
Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
+ let Some(_) = self.parquet_index else {
+ return Ok(None);
+ };
+
let mut builder = UInt64Array::builder(10);
for metadata in metadatas.into_iter() {
let row_count = metadata.num_rows();
@@ -743,7 +755,7 @@ impl<'a> StatisticsConverter<'a> {
})?;
builder.append_value(row_count);
}
- Ok(builder.finish())
+ Ok(Some(builder.finish()))
}
/// Create a new `StatisticsConverter` to extract statistics for a column
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index cd0efc8d35..4c68a57333 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -325,11 +325,9 @@ impl<'a> Test<'a> {
Actual: {null_counts:?}. Expected: {expected_null_counts:?}"
);
- let row_counts = StatisticsConverter::row_group_row_counts(
- reader.metadata().row_groups().iter(),
- )
- .unwrap();
- let row_counts = Some(row_counts);
+ let row_counts = converter
+ .row_group_row_counts(reader.metadata().row_groups().iter())
+ .unwrap();
assert_eq!(
row_counts, expected_row_counts,
"{column_name}: Mismatch with expected row counts. \
@@ -2001,21 +1999,6 @@ async fn test_column_non_existent() {
.build()
.await;
- Test {
- reader: &reader,
- // mins are [-5, -4, 0, 5]
- expected_min: Arc::new(Int64Array::from(vec![None, None, None, None])),
- // maxes are [-1, 0, 4, 9]
- expected_max: Arc::new(Int64Array::from(vec![None, None, None, None])),
- // nulls are [0, 0, 0, 0]
- expected_null_counts: UInt64Array::from(vec![None, None, None, None]),
- // row counts are [5, 5, 5, 5]
- expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
- column_name: "i_do_not_exist",
- check: Check::RowGroup,
- }
- .run_with_schema(&schema);
-
Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
@@ -2027,7 +2010,7 @@ async fn test_column_non_existent() {
// row counts are [5, 5, 5, 5]
expected_row_counts: None,
column_name: "i_do_not_exist",
- check: Check::DataPage,
+ check: Check::Both,
}
.run_with_schema(&schema);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]