This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e7b08ed0e Range scan support for ParquetExec (#1990)
e7b08ed0e is described below

commit e7b08ed0e735de7a831c67dc811ebba36c0e34cc
Author: Yijie Shen <[email protected]>
AuthorDate: Thu Apr 14 08:07:37 2022 +0800

    Range scan support for ParquetExec (#1990)
    
    * Filter parquet row groups by range as well
    
    * fix
    
    * WIP: case when expr works
    
    * short-circuit case_when
    
    * else
    
    * only range part
    
    * Update datafusion/core/src/datasource/listing/mod.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * test
    
    * Update parquet.rs
    
    * fix
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 ballista/rust/core/proto/ballista.proto            |  6 ++
 .../core/src/serde/physical_plan/from_proto.rs     | 14 +++-
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 14 +++-
 datafusion/core/src/datasource/listing/helpers.rs  |  3 +
 datafusion/core/src/datasource/listing/mod.rs      | 29 +++++++-
 .../core/src/physical_plan/file_format/parquet.rs  | 86 ++++++++++++++++++++++
 6 files changed, 149 insertions(+), 3 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index bab783a9e..e7b1a7508 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -72,11 +72,17 @@ message Statistics {
   bool is_exact = 4;
 }
 
+message FileRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
 message PartitionedFile {
   string path = 1;
   uint64 size = 2;
   uint64 last_modified_ns = 3;
   repeated datafusion.ScalarValue partition_values = 4;
+  FileRange range = 5;
 }
 
 message CsvFormat {
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index cc7f866e9..7e759bd60 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc};
 use datafusion::datafusion_data_access::{
     object_store::local::LocalFileSystem, FileMeta, SizedFile,
 };
-use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::execution::context::ExecutionProps;
 use datafusion::logical_plan::FunctionRegistry;
 
@@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for 
PartitionedFile {
                 .iter()
                 .map(|v| v.try_into())
                 .collect::<Result<Vec<_>, _>>()?,
+            range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+        })
+    }
+}
+
+impl TryFrom<&protobuf::FileRange> for FileRange {
+    type Error = BallistaError;
+
+    fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
+        Ok(FileRange {
+            start: value.start,
+            end: value.end,
         })
     }
 }
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 5b9fb1b25..3a1f24d0f 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
     Statistics,
 };
 
-use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::physical_plan::file_format::FileScanConfig;
 
 use datafusion::physical_plan::expressions::{Count, Literal};
@@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for 
protobuf::PartitionedFile {
                 .iter()
                 .map(|v| v.try_into())
                 .collect::<Result<Vec<_>, _>>()?,
+            range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
+        })
+    }
+}
+
+impl TryFrom<&FileRange> for protobuf::FileRange {
+    type Error = BallistaError;
+
+    fn try_from(value: &FileRange) -> Result<Self, Self::Error> {
+        Ok(protobuf::FileRange {
+            start: value.start,
+            end: value.end,
         })
     }
 }
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index a236bf1e9..a6fd12543 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -174,6 +174,7 @@ pub async fn pruned_partition_list(
                     Ok(PartitionedFile {
                         partition_values: vec![],
                         file_meta: f?,
+                        range: None,
                     })
                 }),
         ));
@@ -217,6 +218,7 @@ pub async fn pruned_partition_list(
                             Ok(PartitionedFile {
                                 partition_values,
                                 file_meta,
+                                range: None,
                             })
                         })
                     }
@@ -358,6 +360,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> 
Vec<PartitionedFile> {
                         ScalarValue::try_from_array(batch.column(col), 
row).unwrap()
                     })
                     .collect(),
+                range: None,
             })
         })
         .collect()
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index 200349b5e..d7932b38f 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -32,7 +32,19 @@ pub use table::{ListingOptions, ListingTable, 
ListingTableConfig};
 pub type PartitionedFileStream =
     Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 
'static>>;
 
+/// Only scan a subset of Row Groups from the Parquet file whose data 
"midpoint"
+/// lies within the [start, end) byte offsets. This option can be used to scan 
non-overlapping
+/// sections of a Parquet file in parallel.
 #[derive(Debug, Clone)]
+pub struct FileRange {
+    /// Range start
+    pub start: i64,
+    /// Range end
+    pub end: i64,
+}
+
+#[derive(Debug, Clone)]
+/// A single file or part of a file that should be read, along with its 
schema, statistics
 /// A single file that should be read, along with its schema, statistics
 /// and partition column values that need to be appended to each row.
 pub struct PartitionedFile {
@@ -40,7 +52,8 @@ pub struct PartitionedFile {
     pub file_meta: FileMeta,
     /// Values of partition columns to be appended to each row
     pub partition_values: Vec<ScalarValue>,
-    // We may include row group range here for a more fine-grained parallel 
execution
+    /// An optional file range for a more fine-grained parallel execution
+    pub range: Option<FileRange>,
 }
 
 impl PartitionedFile {
@@ -52,6 +65,19 @@ impl PartitionedFile {
                 last_modified: None,
             },
             partition_values: vec![],
+            range: None,
+        }
+    }
+
+    /// Create a file range without metadata or partition
+    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> 
Self {
+        Self {
+            file_meta: FileMeta {
+                sized_file: SizedFile { path, size },
+                last_modified: None,
+            },
+            partition_values: vec![],
+            range: Some(FileRange { start, end }),
         }
     }
 }
@@ -67,5 +93,6 @@ pub fn local_unpartitioned_file(file: String) -> 
PartitionedFile {
     PartitionedFile {
         file_meta: local::local_unpartitioned_file(file),
         partition_values: vec![],
+        range: None,
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 071a07fa0..cfc99a71a 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -337,6 +337,14 @@ impl ParquetExecStream {
                 file_metrics,
             ));
         }
+        if let Some(range) = &file.range {
+            assert!(
+                range.start >= 0 && range.end > 0 && range.end > range.start,
+                "invalid range specified: {:?}",
+                range
+            );
+            opt = opt.with_range(range.start, range.end);
+        }
 
         let file_reader = SerializedFileReader::new_with_options(
             ChunkObjectReader(object_reader),
@@ -649,6 +657,7 @@ mod tests {
     };
 
     use super::*;
+    use crate::datasource::listing::FileRange;
     use crate::execution::options::CsvReadOptions;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use arrow::array::Float32Array;
@@ -656,6 +665,7 @@ mod tests {
         array::{Int64Array, Int8Array, StringArray},
         datatypes::{DataType, Field},
     };
+    use datafusion_data_access::object_store::local;
     use datafusion_expr::{col, lit};
     use futures::StreamExt;
     use parquet::{
@@ -1099,6 +1109,81 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn parquet_exec_with_range() -> Result<()> {
+        fn file_range(file: String, start: i64, end: i64) -> PartitionedFile {
+            PartitionedFile {
+                file_meta: local::local_unpartitioned_file(file),
+                partition_values: vec![],
+                range: Some(FileRange { start, end }),
+            }
+        }
+
+        async fn assert_parquet_read(
+            file_groups: Vec<Vec<PartitionedFile>>,
+            expected_row_num: Option<usize>,
+            task_ctx: Arc<TaskContext>,
+            file_schema: SchemaRef,
+        ) -> Result<()> {
+            let parquet_exec = ParquetExec::new(
+                FileScanConfig {
+                    object_store: Arc::new(LocalFileSystem {}),
+                    file_groups,
+                    file_schema,
+                    statistics: Statistics::default(),
+                    projection: None,
+                    limit: None,
+                    table_partition_cols: vec![],
+                },
+                None,
+            );
+            assert_eq!(parquet_exec.output_partitioning().partition_count(), 
1);
+            let results = parquet_exec.execute(0, 
task_ctx).await?.next().await;
+
+            if let Some(expected_row_num) = expected_row_num {
+                let batch = results.unwrap()?;
+                assert_eq!(expected_row_num, batch.num_rows());
+            } else {
+                assert!(results.is_none());
+            }
+
+            Ok(())
+        }
+
+        let session_ctx = SessionContext::new();
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/alltypes_plain.parquet", testdata);
+        let file_schema = ParquetFormat::default()
+            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+            .await?;
+
+        let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]];
+        let group_contain = vec![vec![file_range(filename.clone(), 5, 
i64::MAX)]];
+        let group_all = vec![vec![
+            file_range(filename.clone(), 0, 5),
+            file_range(filename.clone(), 5, i64::MAX),
+        ]];
+
+        assert_parquet_read(
+            group_empty,
+            None,
+            session_ctx.task_ctx(),
+            file_schema.clone(),
+        )
+        .await?;
+        assert_parquet_read(
+            group_contain,
+            Some(8),
+            session_ctx.task_ctx(),
+            file_schema.clone(),
+        )
+        .await?;
+        assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(), 
file_schema)
+            .await?;
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn parquet_exec_with_partition() -> Result<()> {
         let session_ctx = SessionContext::new();
@@ -1171,6 +1256,7 @@ mod tests {
                 last_modified: None,
             },
             partition_values: vec![],
+            range: None,
         };
 
         let parquet_exec = ParquetExec::new(

Reply via email to