rdettai commented on a change in pull request #1141:
URL: https://github.com/apache/arrow-datafusion/pull/1141#discussion_r738254353
##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -613,33 +614,28 @@ message ScanLimit {
uint32 limit = 1;
}
-message ParquetScanExecNode {
+message FileScanExecConf {
Review comment:
Promoted the `PhysicalPlanConfig` entity to the
`physical_plan/file_format`, which enables us to factorize a lot of
configuration that is the same for all file formats. Here for instance we
serialize the configuration in one common entity to avoid having the same
parameters over and over in all `XxxScanExecConf`.
The rational for doing this here is that clippy was complaining more and
more often that methods had too many arguments 😉
Suggestions on the naming of the config entity (`PhysicalPlanConfig`) or the
serialized version (`FileScanExecConf`) are welcome 😄
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -128,8 +130,8 @@ pub async fn get_statistics_with_limit(
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
- // Values of partition columns to be appended to each row
- // pub partition_value: Option<Vec<ScalarValue>>,
+ /// Values of partition columns to be appended to each row
+ pub partition_values: Vec<ScalarValue>,
Review comment:
Scalar value is always `Utf8` for now in the `ListingTable`
implementation. Even though I usually prefer to apply YAGNI in these cases,
this is a core abstraction that might be used by custom table providers, and
those might want to allow stronger typing for their partition columns.
##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -294,7 +295,7 @@ message ListingTableScanNode {
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
- repeated string partitions = 7;
+ repeated string table_partition_cols = 7;
Review comment:
Renamed the `partitions` field to `table_partition_cols` to make it more
explicit (cf
https://github.com/apache/arrow-datafusion/pull/1141#discussion_r735280489)
##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -24,19 +24,134 @@ mod json;
mod parquet;
pub use self::parquet::ParquetExec;
+use arrow::{
+ array::{ArrayData, ArrayRef, DictionaryArray, UInt8BufferBuilder},
+ buffer::Buffer,
+ datatypes::{DataType, Field, Schema, SchemaRef, UInt8Type},
+ error::{ArrowError, Result as ArrowResult},
+ record_batch::RecordBatch,
+};
pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;
-use crate::datasource::PartitionedFile;
-use std::fmt::{Display, Formatter, Result};
+use crate::{
+ datasource::{object_store::ObjectStore, PartitionedFile},
+ scalar::ScalarValue,
+};
+use std::{
+ collections::HashMap,
+ fmt::{Display, Formatter, Result as FmtResult},
+ sync::Arc,
+ vec,
+};
+
+use super::{ColumnStatistics, Statistics};
+
+lazy_static! {
+ /// The datatype used for all partitioning columns for now
+ pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType =
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
Review comment:
For now the file format executor only support string partition columns
encoded as dictionaries to optimize space useage (partition columns have
constant values for every row of the files, which is perfect for dictionary
encoding).
##########
File path: datafusion/src/datasource/file_format/mod.rs
##########
@@ -29,33 +29,12 @@ use std::sync::Arc;
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
+use crate::physical_plan::file_format::PhysicalPlanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};
use async_trait::async_trait;
-use super::object_store::{ObjectReader, ObjectReaderStream, ObjectStore};
-use super::PartitionedFile;
-
-/// The configurations to be passed when creating a physical plan for
-/// a given file format.
-pub struct PhysicalPlanConfig {
Review comment:
moved this to physical_plan/file_format/mod.rs as it is a means of
factorizing file format execution plan inputs
##########
File path: datafusion/src/datasource/listing/mod.rs
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses the `ObjectStore` listing capability
+//! to get the list of files to process.
+
+mod helpers;
+mod table;
Review comment:
The `listing.rs` file was becoming huge, so it was split into two files.
This also helps maintaining tests closer to the code. An unfortunate side
effect is that we loose the git history for some of the code that was moved to
`helpers.rs`.
##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -50,3 +165,260 @@ impl<'a> Display for FileGroupsDisplay<'a> {
write!(f, "[{}]", parts.join(", "))
}
}
+
+/// A helper that projects partition columns into the file record batches
+struct PartitionColumnProjector {
Review comment:
`PartitionColumnProjector` helps injecting partition columns into file
record batches both in `FileStream` and the `ParquetExec`. One interesting
trick is the usage of a cache for the key part of the partition column
dictionaries. Indeed, these columns are constant, so the dictionaries that
encode them have all their keys equal to 0, so we can re-use the same
"all-zero" buffer across batches. This makes the space consumption of the
partition columns O(batch_size) instead of O(record_count).
##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -24,19 +24,134 @@ mod json;
mod parquet;
pub use self::parquet::ParquetExec;
+use arrow::{
+ array::{ArrayData, ArrayRef, DictionaryArray, UInt8BufferBuilder},
+ buffer::Buffer,
+ datatypes::{DataType, Field, Schema, SchemaRef, UInt8Type},
+ error::{ArrowError, Result as ArrowResult},
+ record_batch::RecordBatch,
+};
pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;
-use crate::datasource::PartitionedFile;
-use std::fmt::{Display, Formatter, Result};
+use crate::{
+ datasource::{object_store::ObjectStore, PartitionedFile},
+ scalar::ScalarValue,
+};
+use std::{
+ collections::HashMap,
+ fmt::{Display, Formatter, Result as FmtResult},
+ sync::Arc,
+ vec,
+};
+
+use super::{ColumnStatistics, Statistics};
+
+lazy_static! {
+ /// The datatype used for all partitioning columns for now
+ pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType =
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
+}
+
+/// The base configurations to provide when creating a physical plan for
+/// any given file format.
+#[derive(Debug, Clone)]
+pub struct PhysicalPlanConfig {
+ /// Store from which the `files` should be fetched
+ pub object_store: Arc<dyn ObjectStore>,
+ /// Schema before projection. It contains the columns that are expected
+ /// to be in the files without the table partition columns.
+ pub file_schema: SchemaRef,
+ /// List of files to be processed, grouped into partitions
+ pub file_groups: Vec<Vec<PartitionedFile>>,
+ /// Estimated overall statistics of the files, taking `filters` into
account.
+ pub statistics: Statistics,
+ /// Columns on which to project the data. Indexes that are higher than the
+ /// number of columns of `file_schema` refer to `table_partition_cols`.
+ pub projection: Option<Vec<usize>>,
+ /// The maximum number of records per arrow column
+ pub batch_size: usize,
+ /// The minimum number of records required from this source plan
+ pub limit: Option<usize>,
+ /// The partitioning column names
+ pub table_partition_cols: Vec<String>,
+}
+
+impl PhysicalPlanConfig {
+ /// Project the schema and the statistics on the given column indices
+ fn project(&self) -> (SchemaRef, Statistics) {
Review comment:
`PhysicalPlanConfig` was originally intended to reduce the number of
arguments passed around from the table provider to the execution plans, but it
ended up as a nice receiver for various schema and column name
projection/extraction operations that where present in multiple places.
##########
File path: datafusion/src/datasource/listing/helpers.rs
##########
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helper functions for the table implementation
Review comment:
I tried to keep the methods well documented, even though these are
private methods. I hope this will help reviewers in their task. Note here that
`pruned_partition_list` and `split_files` appear as new but were actually
ported from `listing.rs`.
##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -24,19 +24,134 @@ mod json;
mod parquet;
pub use self::parquet::ParquetExec;
+use arrow::{
+ array::{ArrayData, ArrayRef, DictionaryArray, UInt8BufferBuilder},
+ buffer::Buffer,
+ datatypes::{DataType, Field, Schema, SchemaRef, UInt8Type},
+ error::{ArrowError, Result as ArrowResult},
+ record_batch::RecordBatch,
+};
pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;
-use crate::datasource::PartitionedFile;
-use std::fmt::{Display, Formatter, Result};
+use crate::{
+ datasource::{object_store::ObjectStore, PartitionedFile},
+ scalar::ScalarValue,
+};
+use std::{
+ collections::HashMap,
+ fmt::{Display, Formatter, Result as FmtResult},
+ sync::Arc,
+ vec,
+};
+
+use super::{ColumnStatistics, Statistics};
+
+lazy_static! {
+ /// The datatype used for all partitioning columns for now
+ pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType =
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
+}
+
+/// The base configurations to provide when creating a physical plan for
+/// any given file format.
+#[derive(Debug, Clone)]
+pub struct PhysicalPlanConfig {
+ /// Store from which the `files` should be fetched
+ pub object_store: Arc<dyn ObjectStore>,
+ /// Schema before projection. It contains the columns that are expected
+ /// to be in the files without the table partition columns.
+ pub file_schema: SchemaRef,
Review comment:
we specify systematically what the schema represents:
- `file_schema` means that only the columns that can be found within the
file are described
- `table_schema` means that on top of the file columns, we have partition
columns
- `projected_schema` means that the column projection was applied
##########
File path: datafusion/src/physical_plan/file_format/avro.rs
##########
@@ -185,69 +141,134 @@ impl ExecutionPlan for AvroExec {
write!(
f,
"AvroExec: files={}, batch_size={}, limit={:?}",
- super::FileGroupsDisplay(&self.file_groups),
- self.batch_size,
- self.limit,
+ super::FileGroupsDisplay(&self.base_config.file_groups),
+ self.base_config.batch_size,
+ self.base_config.limit,
)
}
}
}
fn statistics(&self) -> Statistics {
- self.statistics.clone()
+ self.projected_statistics.clone()
}
}
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
+ use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::object_store::local::{
- local_file_meta, local_object_reader_stream, LocalFileSystem,
+ local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
};
+ use crate::scalar::ScalarValue;
+ use futures::StreamExt;
use super::*;
#[tokio::test]
- async fn test() -> Result<()> {
- use futures::StreamExt;
-
- use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
-
+ async fn avro_exec_without_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let avro_exec = AvroExec::new(
- Arc::new(LocalFileSystem {}),
- vec![vec![PartitionedFile {
- file_meta: local_file_meta(filename.clone()),
- }]],
- Statistics::default(),
- AvroFormat {}
+ let avro_exec = AvroExec::new(PhysicalPlanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_groups:
vec![vec![local_unpartitioned_file(filename.clone())]],
+ file_schema: AvroFormat {}
.infer_schema(local_object_reader_stream(vec![filename]))
.await?,
- Some(vec![0, 1, 2]),
- 1024,
- None,
- );
+ statistics: Statistics::default(),
+ projection: Some(vec![0, 1, 2]),
+ batch_size: 1024,
+ limit: None,
+ table_partition_cols: vec![],
+ });
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
- let mut results = avro_exec.execute(0).await?;
- let batch = results.next().await.unwrap()?;
+ let mut results = avro_exec.execute(0).await.expect("plan execution
failed");
+ let batch = results
+ .next()
+ .await
+ .expect("plan iterator empty")
+ .expect("plan iterator returned an error");
- assert_eq!(8, batch.num_rows());
- assert_eq!(3, batch.num_columns());
+ let expected = vec![
+ "+----+----------+-------------+",
+ "| id | bool_col | tinyint_col |",
+ "+----+----------+-------------+",
+ "| 4 | true | 0 |",
+ "| 5 | false | 1 |",
+ "| 6 | true | 0 |",
+ "| 7 | false | 1 |",
+ "| 2 | true | 0 |",
+ "| 3 | false | 1 |",
+ "| 0 | true | 0 |",
+ "| 1 | false | 1 |",
+ "+----+----------+-------------+",
+ ];
- let schema = batch.schema();
- let field_names: Vec<&str> =
- schema.fields().iter().map(|f| f.name().as_str()).collect();
- assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names);
+ crate::assert_batches_eq!(expected, &[batch]);
Review comment:
moved to `assert_batches_eq` in both existing tests as well as new tests
(focused on execution plans with partitioning) because this helps visualizing
the differences in a glance.
##########
File path: datafusion/src/physical_plan/file_format/file_stream.rs
##########
@@ -63,52 +64,73 @@ impl<T> FormatReaderOpener for T where
pub struct FileStream<F: FormatReaderOpener> {
/// An iterator over record batches of the last file returned by file_iter
batch_iter: BatchIter,
- /// An iterator over input files
+ /// Partitioning column values for the current batch_iter
+ partition_values: Vec<ScalarValue>,
+ /// An iterator over input files.
file_iter: FileIter,
- /// The stream schema (file schema after projection)
- schema: SchemaRef,
+ /// The stream schema (file schema including partition columns and after
+ /// projection).
+ projected_schema: SchemaRef,
/// The remaining number of records to parse, None if no limit
remain: Option<usize>,
/// A closure that takes a reader and an optional remaining number of lines
/// (before reaching the limit) and returns a batch iterator. If the file
reader
/// is not capable of limiting the number of records in the last batch,
the file
/// stream will take care of truncating it.
file_reader: F,
+ /// The partition column projector
+ pc_projector: PartitionColumnProjector,
+ /// the store from which to source the files.
+ object_store: Arc<dyn ObjectStore>,
}
Review comment:
Reworked the `FileStream` helper to allow the injection of partition
columns into the file record batches when necessary.
##########
File path: datafusion/tests/path_partition.rs
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test queries on partitioned datasets
Review comment:
We introduced a whole new file for tests that focus on the partitioning
because the `sql.rs` test file was already saturated. This is maybe not the
best split for integration tests.
##########
File path: datafusion/src/datasource/listing/table.rs
##########
@@ -96,37 +97,48 @@ impl ListingOptions {
.await?
.map(move |file_meta|
object_store.file_reader(file_meta?.sized_file));
let file_schema =
self.format.infer_schema(Box::pin(file_stream)).await?;
- // Add the partition columns to the file schema
- let mut fields = file_schema.fields().clone();
- for part in &self.partitions {
- fields.push(Field::new(part, DataType::Utf8, false));
- }
- Ok(Arc::new(Schema::new(fields)))
+ Ok(file_schema)
}
}
/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
pub struct ListingTable {
object_store: Arc<dyn ObjectStore>,
- path: String,
- schema: SchemaRef,
+ table_path: String,
+ /// File fields only
+ file_schema: SchemaRef,
+ /// File fields + partition columns
+ table_schema: SchemaRef,
options: ListingOptions,
}
impl ListingTable {
/// Create new table that lists the FS to get the files to scan.
+ /// The provided `schema` must be resolved before creating the table
+ /// and should contain the fields of the file without the table
+ /// partitioning columns.
pub fn new(
object_store: Arc<dyn ObjectStore>,
- path: String,
- // the schema must be resolved before creating the table
- schema: SchemaRef,
+ table_path: String,
Review comment:
Just as with schemas, we try to be more explicit, indicating whether we
are dealing with `table_path` or `file_path`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]