This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e7b08ed0e Range scan support for ParquetExec (#1990)
e7b08ed0e is described below
commit e7b08ed0e735de7a831c67dc811ebba36c0e34cc
Author: Yijie Shen <[email protected]>
AuthorDate: Thu Apr 14 08:07:37 2022 +0800
Range scan support for ParquetExec (#1990)
* Filter parquet row groups by range as well
* fix
* WIP: case when expr works
* short-circuit case_when
* else
* only range part
* Update datafusion/core/src/datasource/listing/mod.rs
Co-authored-by: Andrew Lamb <[email protected]>
* test
* Update parquet.rs
* fix
Co-authored-by: Andrew Lamb <[email protected]>
---
ballista/rust/core/proto/ballista.proto | 6 ++
.../core/src/serde/physical_plan/from_proto.rs | 14 +++-
.../rust/core/src/serde/physical_plan/to_proto.rs | 14 +++-
datafusion/core/src/datasource/listing/helpers.rs | 3 +
datafusion/core/src/datasource/listing/mod.rs | 29 +++++++-
.../core/src/physical_plan/file_format/parquet.rs | 86 ++++++++++++++++++++++
6 files changed, 149 insertions(+), 3 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index bab783a9e..e7b1a7508 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -72,11 +72,17 @@ message Statistics {
bool is_exact = 4;
}
+message FileRange {
+ int64 start = 1;
+ int64 end = 2;
+}
+
message PartitionedFile {
string path = 1;
uint64 size = 2;
uint64 last_modified_ns = 3;
repeated datafusion.ScalarValue partition_values = 4;
+ FileRange range = 5;
}
message CsvFormat {
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index cc7f866e9..7e759bd60 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc};
use datafusion::datafusion_data_access::{
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
-use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::FunctionRegistry;
@@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for
PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
+ range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+ })
+ }
+}
+
+impl TryFrom<&protobuf::FileRange> for FileRange {
+ type Error = BallistaError;
+
+ fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
+ Ok(FileRange {
+ start: value.start,
+ end: value.end,
})
}
}
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 5b9fb1b25..3a1f24d0f 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
Statistics,
};
-use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::expressions::{Count, Literal};
@@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for
protobuf::PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
+ range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
+ })
+ }
+}
+
+impl TryFrom<&FileRange> for protobuf::FileRange {
+ type Error = BallistaError;
+
+ fn try_from(value: &FileRange) -> Result<Self, Self::Error> {
+ Ok(protobuf::FileRange {
+ start: value.start,
+ end: value.end,
})
}
}
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index a236bf1e9..a6fd12543 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -174,6 +174,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values: vec![],
file_meta: f?,
+ range: None,
})
}),
));
@@ -217,6 +218,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values,
file_meta,
+ range: None,
})
})
}
@@ -358,6 +360,7 @@ fn batches_to_paths(batches: &[RecordBatch]) ->
Vec<PartitionedFile> {
ScalarValue::try_from_array(batch.column(col),
row).unwrap()
})
.collect(),
+ range: None,
})
})
.collect()
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index 200349b5e..d7932b38f 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -32,7 +32,19 @@ pub use table::{ListingOptions, ListingTable,
ListingTableConfig};
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync +
'static>>;
+/// Only scan a subset of Row Groups from the Parquet file whose data
"midpoint"
+/// lies within the [start, end) byte offsets. This option can be used to scan
non-overlapping
+/// sections of a Parquet file in parallel.
#[derive(Debug, Clone)]
+pub struct FileRange {
+ /// Range start
+ pub start: i64,
+ /// Range end
+ pub end: i64,
+}
+
+#[derive(Debug, Clone)]
+/// A single file or part of a file that should be read, along with its
schema, statistics
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
@@ -40,7 +52,8 @@ pub struct PartitionedFile {
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
- // We may include row group range here for a more fine-grained parallel
execution
+ /// An optional file range for a more fine-grained parallel execution
+ pub range: Option<FileRange>,
}
impl PartitionedFile {
@@ -52,6 +65,19 @@ impl PartitionedFile {
last_modified: None,
},
partition_values: vec![],
+ range: None,
+ }
+ }
+
+ /// Create a file range without metadata or partition
+ pub fn new_with_range(path: String, size: u64, start: i64, end: i64) ->
Self {
+ Self {
+ file_meta: FileMeta {
+ sized_file: SizedFile { path, size },
+ last_modified: None,
+ },
+ partition_values: vec![],
+ range: Some(FileRange { start, end }),
}
}
}
@@ -67,5 +93,6 @@ pub fn local_unpartitioned_file(file: String) ->
PartitionedFile {
PartitionedFile {
file_meta: local::local_unpartitioned_file(file),
partition_values: vec![],
+ range: None,
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 071a07fa0..cfc99a71a 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -337,6 +337,14 @@ impl ParquetExecStream {
file_metrics,
));
}
+ if let Some(range) = &file.range {
+ assert!(
+ range.start >= 0 && range.end > 0 && range.end > range.start,
+ "invalid range specified: {:?}",
+ range
+ );
+ opt = opt.with_range(range.start, range.end);
+ }
let file_reader = SerializedFileReader::new_with_options(
ChunkObjectReader(object_reader),
@@ -649,6 +657,7 @@ mod tests {
};
use super::*;
+ use crate::datasource::listing::FileRange;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::Float32Array;
@@ -656,6 +665,7 @@ mod tests {
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field},
};
+ use datafusion_data_access::object_store::local;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use parquet::{
@@ -1099,6 +1109,81 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn parquet_exec_with_range() -> Result<()> {
+ fn file_range(file: String, start: i64, end: i64) -> PartitionedFile {
+ PartitionedFile {
+ file_meta: local::local_unpartitioned_file(file),
+ partition_values: vec![],
+ range: Some(FileRange { start, end }),
+ }
+ }
+
+ async fn assert_parquet_read(
+ file_groups: Vec<Vec<PartitionedFile>>,
+ expected_row_num: Option<usize>,
+ task_ctx: Arc<TaskContext>,
+ file_schema: SchemaRef,
+ ) -> Result<()> {
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_groups,
+ file_schema,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ None,
+ );
+ assert_eq!(parquet_exec.output_partitioning().partition_count(),
1);
+ let results = parquet_exec.execute(0,
task_ctx).await?.next().await;
+
+ if let Some(expected_row_num) = expected_row_num {
+ let batch = results.unwrap()?;
+ assert_eq!(expected_row_num, batch.num_rows());
+ } else {
+ assert!(results.is_none());
+ }
+
+ Ok(())
+ }
+
+ let session_ctx = SessionContext::new();
+ let testdata = crate::test_util::parquet_test_data();
+ let filename = format!("{}/alltypes_plain.parquet", testdata);
+ let file_schema = ParquetFormat::default()
+ .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+ .await?;
+
+ let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]];
+ let group_contain = vec![vec![file_range(filename.clone(), 5,
i64::MAX)]];
+ let group_all = vec![vec![
+ file_range(filename.clone(), 0, 5),
+ file_range(filename.clone(), 5, i64::MAX),
+ ]];
+
+ assert_parquet_read(
+ group_empty,
+ None,
+ session_ctx.task_ctx(),
+ file_schema.clone(),
+ )
+ .await?;
+ assert_parquet_read(
+ group_contain,
+ Some(8),
+ session_ctx.task_ctx(),
+ file_schema.clone(),
+ )
+ .await?;
+ assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(),
file_schema)
+ .await?;
+
+ Ok(())
+ }
+
#[tokio::test]
async fn parquet_exec_with_partition() -> Result<()> {
let session_ctx = SessionContext::new();
@@ -1171,6 +1256,7 @@ mod tests {
last_modified: None,
},
partition_values: vec![],
+ range: None,
};
let parquet_exec = ParquetExec::new(