This is an automated email from the ASF dual-hosted git repository.
jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1378c20 ARROW-10874: [Rust][DataFusion] Add statistics for MemTable,
change statistics struct
1378c20 is described below
commit 1378c20bad350dba484ae21aa8a2588f44012452
Author: Heres, Daniel <[email protected]>
AuthorDate: Sat Dec 12 05:46:01 2020 +0100
ARROW-10874: [Rust][DataFusion] Add statistics for MemTable, change
statistics struct
This adds the `num_rows` for `MemTable` (which would allow optimizations
for e.g. joins).
This also changes the trait to allow to have individual stats optional
instead of "all or nothing".
I think it is common that you know only some aspects of a data source,
while not knowing the other without reading the entire dataset, that's why I
think the `Statistics` should be changed to take optional items instead.
Closes #8889 from Dandandan/mem_table_statistics
Authored-by: Heres, Daniel <[email protected]>
Signed-off-by: Jorge C. Leitao <[email protected]>
---
rust/datafusion/src/datasource/csv.rs | 6 +++---
rust/datafusion/src/datasource/datasource.rs | 8 ++++----
rust/datafusion/src/datasource/memory.rs | 23 ++++++++++++++++-------
rust/datafusion/src/datasource/parquet.rs | 6 +++---
rust/datafusion/tests/dataframe.rs | 4 ++--
5 files changed, 28 insertions(+), 19 deletions(-)
diff --git a/rust/datafusion/src/datasource/csv.rs
b/rust/datafusion/src/datasource/csv.rs
index c2b9b57..8fc27bd 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -53,7 +53,7 @@ pub struct CsvFile {
has_header: bool,
delimiter: u8,
file_extension: String,
- statistics: Option<Statistics>,
+ statistics: Statistics,
}
impl CsvFile {
@@ -77,7 +77,7 @@ impl CsvFile {
has_header: options.has_header,
delimiter: options.delimiter,
file_extension: String::from(options.file_extension),
- statistics: None,
+ statistics: Statistics::default(),
})
}
}
@@ -108,7 +108,7 @@ impl TableProvider for CsvFile {
)?))
}
- fn statistics(&self) -> Option<Statistics> {
+ fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
diff --git a/rust/datafusion/src/datasource/datasource.rs
b/rust/datafusion/src/datasource/datasource.rs
index 5882336..af77346 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -26,12 +26,12 @@ use crate::physical_plan::ExecutionPlan;
/// This table statistics are estimates.
/// It can not be used directly in the precise compute
-#[derive(Clone)]
+#[derive(Clone, Default)]
pub struct Statistics {
/// The number of table rows
- pub num_rows: usize,
+ pub num_rows: Option<usize>,
/// total byte of the table rows
- pub total_byte_size: usize,
+ pub total_byte_size: Option<usize>,
}
/// Source table
@@ -52,5 +52,5 @@ pub trait TableProvider {
/// Returns the table Statistics
/// Statistics should be optional because not all data sources can provide
statistics.
- fn statistics(&self) -> Option<Statistics>;
+ fn statistics(&self) -> Statistics;
}
diff --git a/rust/datafusion/src/datasource/memory.rs
b/rust/datafusion/src/datasource/memory.rs
index 5b4f9fd..f00a381 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -36,21 +36,28 @@ use crate::physical_plan::ExecutionPlan;
pub struct MemTable {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
- statistics: Option<Statistics>,
+ statistics: Statistics,
}
impl MemTable {
/// Create a new in-memory table from the provided schema and record
batches
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) ->
Result<Self> {
- if partitions.iter().all(|partition| {
- partition
+ if partitions
+ .iter()
+ .flatten()
+ .all(|batches| batches.schema() == schema)
+ {
+ let num_rows: usize = partitions
.iter()
- .all(|batches| batches.schema().as_ref() == schema.as_ref())
- }) {
+ .flat_map(|batches| batches.iter().map(RecordBatch::num_rows))
+ .sum();
Ok(Self {
schema,
batches: partitions,
- statistics: None,
+ statistics: Statistics {
+ num_rows: Some(num_rows),
+ total_byte_size: None,
+ },
})
} else {
Err(DataFusionError::Plan(
@@ -136,7 +143,7 @@ impl TableProvider for MemTable {
)?))
}
- fn statistics(&self) -> Option<Statistics> {
+ fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
@@ -167,6 +174,8 @@ mod tests {
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
+ assert_eq!(provider.statistics().num_rows, Some(3));
+
// scan with projection
let exec = provider.scan(&Some(vec![2, 1]), 1024)?;
let mut it = exec.execute(0).await?;
diff --git a/rust/datafusion/src/datasource/parquet.rs
b/rust/datafusion/src/datasource/parquet.rs
index 6d7ec02..9ac9c7b 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -33,7 +33,7 @@ use crate::physical_plan::ExecutionPlan;
pub struct ParquetTable {
path: String,
schema: SchemaRef,
- statistics: Option<Statistics>,
+ statistics: Statistics,
}
impl ParquetTable {
@@ -44,7 +44,7 @@ impl ParquetTable {
Ok(Self {
path: path.to_string(),
schema,
- statistics: None,
+ statistics: Statistics::default(),
})
}
}
@@ -73,7 +73,7 @@ impl TableProvider for ParquetTable {
)?))
}
- fn statistics(&self) -> Option<Statistics> {
+ fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
diff --git a/rust/datafusion/tests/dataframe.rs
b/rust/datafusion/tests/dataframe.rs
index 6bf6c23..472de2d 100644
--- a/rust/datafusion/tests/dataframe.rs
+++ b/rust/datafusion/tests/dataframe.rs
@@ -147,8 +147,8 @@ impl TableProvider for CustomTableProvider {
}))
}
- fn statistics(&self) -> Option<Statistics> {
- None
+ fn statistics(&self) -> Statistics {
+ Statistics::default()
}
}