This is an automated email from the ASF dual-hosted git repository.

jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1378c20  ARROW-10874: [Rust][DataFusion] Add statistics for MemTable, 
change statistics struct
1378c20 is described below

commit 1378c20bad350dba484ae21aa8a2588f44012452
Author: Heres, Daniel <[email protected]>
AuthorDate: Sat Dec 12 05:46:01 2020 +0100

    ARROW-10874: [Rust][DataFusion] Add statistics for MemTable, change 
statistics struct
    
    This adds the `num_rows` for  `MemTable` (which would allow optimizations 
for e.g. joins).
    This also changes the trait to allow to have individual stats optional 
instead of "all or nothing".
    
    I think it is common that you know only some aspects of a data source, 
while not knowing the other without reading the entire dataset, that's why I 
think the `Statistics` should be changed to take optional items instead.
    
    Closes #8889 from Dandandan/mem_table_statistics
    
    Authored-by: Heres, Daniel <[email protected]>
    Signed-off-by: Jorge C. Leitao <[email protected]>
---
 rust/datafusion/src/datasource/csv.rs        |  6 +++---
 rust/datafusion/src/datasource/datasource.rs |  8 ++++----
 rust/datafusion/src/datasource/memory.rs     | 23 ++++++++++++++++-------
 rust/datafusion/src/datasource/parquet.rs    |  6 +++---
 rust/datafusion/tests/dataframe.rs           |  4 ++--
 5 files changed, 28 insertions(+), 19 deletions(-)

diff --git a/rust/datafusion/src/datasource/csv.rs 
b/rust/datafusion/src/datasource/csv.rs
index c2b9b57..8fc27bd 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -53,7 +53,7 @@ pub struct CsvFile {
     has_header: bool,
     delimiter: u8,
     file_extension: String,
-    statistics: Option<Statistics>,
+    statistics: Statistics,
 }
 
 impl CsvFile {
@@ -77,7 +77,7 @@ impl CsvFile {
             has_header: options.has_header,
             delimiter: options.delimiter,
             file_extension: String::from(options.file_extension),
-            statistics: None,
+            statistics: Statistics::default(),
         })
     }
 }
@@ -108,7 +108,7 @@ impl TableProvider for CsvFile {
         )?))
     }
 
-    fn statistics(&self) -> Option<Statistics> {
+    fn statistics(&self) -> Statistics {
         self.statistics.clone()
     }
 }
diff --git a/rust/datafusion/src/datasource/datasource.rs 
b/rust/datafusion/src/datasource/datasource.rs
index 5882336..af77346 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -26,12 +26,12 @@ use crate::physical_plan::ExecutionPlan;
 
 /// This table statistics are estimates.
 /// It can not be used directly in the precise compute
-#[derive(Clone)]
+#[derive(Clone, Default)]
 pub struct Statistics {
     /// The number of table rows
-    pub num_rows: usize,
+    pub num_rows: Option<usize>,
     /// total byte of the table rows
-    pub total_byte_size: usize,
+    pub total_byte_size: Option<usize>,
 }
 
 /// Source table
@@ -52,5 +52,5 @@ pub trait TableProvider {
 
     /// Returns the table Statistics
     /// Statistics should be optional because not all data sources can provide 
statistics.
-    fn statistics(&self) -> Option<Statistics>;
+    fn statistics(&self) -> Statistics;
 }
diff --git a/rust/datafusion/src/datasource/memory.rs 
b/rust/datafusion/src/datasource/memory.rs
index 5b4f9fd..f00a381 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -36,21 +36,28 @@ use crate::physical_plan::ExecutionPlan;
 pub struct MemTable {
     schema: SchemaRef,
     batches: Vec<Vec<RecordBatch>>,
-    statistics: Option<Statistics>,
+    statistics: Statistics,
 }
 
 impl MemTable {
     /// Create a new in-memory table from the provided schema and record 
batches
     pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> 
Result<Self> {
-        if partitions.iter().all(|partition| {
-            partition
+        if partitions
+            .iter()
+            .flatten()
+            .all(|batches| batches.schema() == schema)
+        {
+            let num_rows: usize = partitions
                 .iter()
-                .all(|batches| batches.schema().as_ref() == schema.as_ref())
-        }) {
+                .flat_map(|batches| batches.iter().map(RecordBatch::num_rows))
+                .sum();
             Ok(Self {
                 schema,
                 batches: partitions,
-                statistics: None,
+                statistics: Statistics {
+                    num_rows: Some(num_rows),
+                    total_byte_size: None,
+                },
             })
         } else {
             Err(DataFusionError::Plan(
@@ -136,7 +143,7 @@ impl TableProvider for MemTable {
         )?))
     }
 
-    fn statistics(&self) -> Option<Statistics> {
+    fn statistics(&self) -> Statistics {
         self.statistics.clone()
     }
 }
@@ -167,6 +174,8 @@ mod tests {
 
         let provider = MemTable::try_new(schema, vec![vec![batch]])?;
 
+        assert_eq!(provider.statistics().num_rows, Some(3));
+
         // scan with projection
         let exec = provider.scan(&Some(vec![2, 1]), 1024)?;
         let mut it = exec.execute(0).await?;
diff --git a/rust/datafusion/src/datasource/parquet.rs 
b/rust/datafusion/src/datasource/parquet.rs
index 6d7ec02..9ac9c7b 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -33,7 +33,7 @@ use crate::physical_plan::ExecutionPlan;
 pub struct ParquetTable {
     path: String,
     schema: SchemaRef,
-    statistics: Option<Statistics>,
+    statistics: Statistics,
 }
 
 impl ParquetTable {
@@ -44,7 +44,7 @@ impl ParquetTable {
         Ok(Self {
             path: path.to_string(),
             schema,
-            statistics: None,
+            statistics: Statistics::default(),
         })
     }
 }
@@ -73,7 +73,7 @@ impl TableProvider for ParquetTable {
         )?))
     }
 
-    fn statistics(&self) -> Option<Statistics> {
+    fn statistics(&self) -> Statistics {
         self.statistics.clone()
     }
 }
diff --git a/rust/datafusion/tests/dataframe.rs 
b/rust/datafusion/tests/dataframe.rs
index 6bf6c23..472de2d 100644
--- a/rust/datafusion/tests/dataframe.rs
+++ b/rust/datafusion/tests/dataframe.rs
@@ -147,8 +147,8 @@ impl TableProvider for CustomTableProvider {
         }))
     }
 
-    fn statistics(&self) -> Option<Statistics> {
-        None
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
     }
 }
 

Reply via email to