This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c4e4a1795 Minor Fix for Logical and Physical Expr Conversions (#11142)
0c4e4a1795 is described below
commit 0c4e4a179558bd7f1425d96f286415c9aa9174ca
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Jun 27 21:25:36 2024 +0300
Minor Fix for Logical and Physical Expr Conversions (#11142)
* Minor
* Update planner.rs
---
.../src/datasource/physical_plan/parquet/mod.rs | 14 +++---------
.../datasource/physical_plan/parquet/row_filter.rs | 25 +++++++---------------
.../datasource/physical_plan/parquet/row_groups.rs | 18 ++++++----------
datafusion/core/src/physical_optimizer/pruning.rs | 16 +++++---------
.../physical-expr-common/src/aggregate/mod.rs | 3 +++
datafusion/physical-expr/src/planner.rs | 22 ++++++++++++-------
datafusion/physical-expr/src/utils/guarantee.rs | 16 +++++---------
7 files changed, 45 insertions(+), 69 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 959e50fac8..ea7faac08c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -796,17 +796,15 @@ mod tests {
ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
StructArray,
};
-
use arrow::datatypes::{Field, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow_schema::Fields;
- use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue,
ToDFSchema};
- use datafusion_expr::execution_props::ExecutionProps;
+ use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
- use datafusion_physical_expr::create_physical_expr;
+ use datafusion_physical_expr::planner::logical2physical;
+ use datafusion_physical_plan::ExecutionPlanProperties;
use chrono::{TimeZone, Utc};
- use datafusion_physical_plan::ExecutionPlanProperties;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
@@ -2061,12 +2059,6 @@ mod tests {
Ok(())
}
- fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
- let df_schema = schema.clone().to_dfschema().unwrap();
- let execution_props = ExecutionProps::new();
- create_physical_expr(expr, &df_schema, &execution_props).unwrap()
- }
-
#[tokio::test]
async fn test_struct_filter_parquet() -> Result<()> {
let tmp_dir = TempDir::new()?;
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 18c6c51d28..f9cce5f783 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -410,23 +410,20 @@ pub fn build_row_filter(
#[cfg(test)]
mod test {
- use arrow::datatypes::Field;
- use arrow_schema::TimeUnit::Nanosecond;
- use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
- use parquet::arrow::parquet_to_arrow_schema;
- use parquet::file::reader::{FileReader, SerializedFileReader};
- use rand::prelude::*;
-
+ use super::*;
use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use crate::datasource::schema_adapter::SchemaAdapterFactory;
- use datafusion_common::ToDFSchema;
- use datafusion_expr::execution_props::ExecutionProps;
+ use arrow::datatypes::Field;
+ use arrow_schema::TimeUnit::Nanosecond;
use datafusion_expr::{cast, col, lit, Expr};
- use datafusion_physical_expr::create_physical_expr;
+ use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::metrics::{Count, Time};
- use super::*;
+ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+ use parquet::arrow::parquet_to_arrow_schema;
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+ use rand::prelude::*;
// We should ignore predicate that read non-primitive columns
#[test]
@@ -590,10 +587,4 @@ mod test {
assert_eq!(projection, remapped)
}
}
-
- fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
- let df_schema = schema.clone().to_dfschema().unwrap();
- let execution_props = ExecutionProps::new();
- create_physical_expr(expr, &df_schema, &execution_props).unwrap()
- }
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index da8b793a5c..9bc7980574 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -404,15 +404,19 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
#[cfg(test)]
mod tests {
+ use std::ops::Rem;
+ use std::sync::Arc;
+
use super::*;
use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+
use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::{DataType, Field};
- use datafusion_common::{Result, ToDFSchema};
- use datafusion_expr::execution_props::ExecutionProps;
+ use datafusion_common::Result;
use datafusion_expr::{cast, col, lit, Expr};
- use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
+ use datafusion_physical_expr::planner::logical2physical;
+
use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::basic::LogicalType;
@@ -422,8 +426,6 @@ mod tests {
basic::Type as PhysicalType, file::statistics::Statistics as
ParquetStatistics,
schema::types::SchemaDescPtr,
};
- use std::ops::Rem;
- use std::sync::Arc;
struct PrimitiveTypeField {
name: &'static str,
@@ -1111,12 +1113,6 @@ mod tests {
ParquetFileMetrics::new(0, "file.parquet", &metrics)
}
- fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
- let df_schema = schema.clone().to_dfschema().unwrap();
- let execution_props = ExecutionProps::new();
- create_physical_expr(expr, &df_schema, &execution_props).unwrap()
- }
-
#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
BloomFilterTest::new_data_index_bloom_encoding_stats()
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 7051dd9978..e8f2f34abd 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -1555,22 +1555,22 @@ pub(crate) enum StatisticsType {
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
+ use std::ops::{Not, Rem};
+
use super::*;
use crate::assert_batches_eq;
use crate::logical_expr::{col, lit};
+
use arrow::array::Decimal128Array;
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray},
datatypes::TimeUnit,
};
use arrow_array::UInt64Array;
- use datafusion_common::ToDFSchema;
- use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::InList;
use datafusion_expr::{cast, is_null, try_cast, Expr};
- use datafusion_physical_expr::create_physical_expr;
- use std::collections::HashMap;
- use std::ops::{Not, Rem};
+ use datafusion_physical_expr::planner::logical2physical;
#[derive(Debug, Default)]
/// Mock statistic provider for tests
@@ -3876,10 +3876,4 @@ mod tests {
let expr = logical2physical(expr, schema);
build_predicate_expression(&expr, schema, required_columns)
}
-
- fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
- let df_schema = schema.clone().to_dfschema().unwrap();
- let execution_props = ExecutionProps::new();
- create_physical_expr(expr, &df_schema, &execution_props).unwrap()
- }
}
diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index 432267e045..336e28b4d2 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -211,6 +211,9 @@ pub trait AggregateExpr: Send + Sync + Debug +
PartialEq<dyn Any> {
/// Rewrites [`AggregateExpr`], with new expressions given. The argument
should be consistent
/// with the return value of the [`AggregateExpr::all_expressions`] method.
/// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported,
otherwise returns `None`.
+ /// TODO: This method only rewrites the [`PhysicalExpr`]s and does not
handle [`Expr`]s.
+ /// This can cause silent bugs and should be fixed in the future (possibly
with physical-to-logical
+ /// conversions).
fn with_new_expressions(
&self,
_args: Vec<Arc<dyn PhysicalExpr>>,
diff --git a/datafusion/physical-expr/src/planner.rs
b/datafusion/physical-expr/src/planner.rs
index 29b9069c04..8fe99cdca5 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -17,10 +17,15 @@
use std::sync::Arc;
-use arrow::datatypes::Schema;
+use crate::scalar_function;
+use crate::{
+ expressions::{self, binary, like, Column, Literal},
+ PhysicalExpr,
+};
+use arrow::datatypes::Schema;
use datafusion_common::{
- exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue,
+ exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue,
ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction};
@@ -28,12 +33,6 @@ use datafusion_expr::var_provider::is_system_variables;
use datafusion_expr::var_provider::VarType;
use datafusion_expr::{binary_expr, Between, BinaryExpr, Expr, Like, Operator,
TryCast};
-use crate::scalar_function;
-use crate::{
- expressions::{self, binary, like, Column, Literal},
- PhysicalExpr,
-};
-
/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
/// AS int)`.
///
@@ -358,6 +357,13 @@ where
.collect::<Result<Vec<_>>>()
}
+/// Convert a logical expression to a physical expression (without any
simplification, etc)
+pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ create_physical_expr(expr, &df_schema, &execution_props).unwrap()
+}
+
#[cfg(test)]
mod tests {
use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
diff --git a/datafusion/physical-expr/src/utils/guarantee.rs
b/datafusion/physical-expr/src/utils/guarantee.rs
index deaff54538..070034116f 100644
--- a/datafusion/physical-expr/src/utils/guarantee.rs
+++ b/datafusion/physical-expr/src/utils/guarantee.rs
@@ -419,15 +419,16 @@ impl<'a> ColOpLit<'a> {
#[cfg(test)]
mod test {
+ use std::sync::OnceLock;
+
use super::*;
- use crate::create_physical_expr;
+ use crate::planner::logical2physical;
+
use arrow_schema::{DataType, Field, Schema, SchemaRef};
- use datafusion_common::ToDFSchema;
- use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr_fn::*;
use datafusion_expr::{lit, Expr};
+
use itertools::Itertools;
- use std::sync::OnceLock;
#[test]
fn test_literal() {
@@ -867,13 +868,6 @@ mod test {
LiteralGuarantee::try_new(column, Guarantee::NotIn,
literals.iter()).unwrap()
}
- /// Convert a logical expression to a physical expression (without any
simplification, etc)
- fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
- let df_schema = schema.clone().to_dfschema().unwrap();
- let execution_props = ExecutionProps::new();
- create_physical_expr(expr, &df_schema, &execution_props).unwrap()
- }
-
// Schema for testing
fn schema() -> SchemaRef {
SCHEMA
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]