This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e8520ab719 fix bugs explain with non-correlated query (#13210)
e8520ab719 is described below
commit e8520ab7199c341c7767bdf7375f65d5ae9ab3fa
Author: Lordworms <[email protected]>
AuthorDate: Tue Nov 5 08:08:52 2024 -0800
fix bugs explain with non-correlated query (#13210)
* fix bugs explain with non-correlated query
* Use explicit enum for physical errors
* fix comments / fmt
* strip_backtrace to passs ci
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/common/src/display/mod.rs | 7 ++++++-
datafusion/core/src/datasource/listing/table.rs | 12 ++++++------
datafusion/core/src/physical_planner.rs | 8 ++++++--
datafusion/physical-plan/src/explain.rs | 1 -
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 13 +++++++++++++
datafusion/proto/src/generated/prost.rs | 4 +++-
datafusion/proto/src/logical_plan/from_proto.rs | 3 ++-
datafusion/proto/src/logical_plan/to_proto.rs | 4 ++++
datafusion/sqllogictest/test_files/explain.slt | 25 +++++++++++++++++++++++++
datafusion/sqllogictest/test_files/group_by.slt | 1 +
datafusion/sqllogictest/test_files/joins.slt | 4 +++-
datafusion/sqllogictest/test_files/prepare.slt | 2 ++
datafusion/sqllogictest/test_files/update.slt | 9 +++++++--
14 files changed, 79 insertions(+), 15 deletions(-)
diff --git a/datafusion/common/src/display/mod.rs
b/datafusion/common/src/display/mod.rs
index c12e7419e4..bad51c45f8 100644
--- a/datafusion/common/src/display/mod.rs
+++ b/datafusion/common/src/display/mod.rs
@@ -62,6 +62,8 @@ pub enum PlanType {
FinalPhysicalPlanWithStats,
/// The final with schema, fully optimized physical plan which would be
executed
FinalPhysicalPlanWithSchema,
+ /// An error creating the physical plan
+ PhysicalPlanError,
}
impl Display for PlanType {
@@ -91,6 +93,7 @@ impl Display for PlanType {
PlanType::FinalPhysicalPlanWithSchema => {
write!(f, "physical_plan_with_schema")
}
+ PlanType::PhysicalPlanError => write!(f, "physical_plan_error"),
}
}
}
@@ -118,7 +121,9 @@ impl StringifiedPlan {
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
- PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
+ PlanType::FinalLogicalPlan
+ | PlanType::FinalPhysicalPlan
+ | PlanType::PhysicalPlanError => true,
_ => verbose_mode,
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 15125fe5a0..b937a28e93 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -880,18 +880,18 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};
- let filters = conjunction(filters.to_vec())
- .map(|expr| -> Result<_> {
- // NOTE: Use the table schema (NOT file schema) here because
`expr` may contain references to partition columns.
+ let filters = match conjunction(filters.to_vec()) {
+ Some(expr) => {
let table_df_schema =
self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
- Ok(Some(filters))
- })
- .unwrap_or(Ok(None))?;
+ Some(filters)
+ }
+ None => None,
+ };
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 2a96a2ad11..7d475ad2e2 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1797,8 +1797,12 @@ impl DefaultPhysicalPlanner {
Err(e) => return Err(e),
}
}
- Err(e) => stringified_plans
- .push(StringifiedPlan::new(InitialPhysicalPlan,
e.to_string())),
+ Err(err) => {
+ stringified_plans.push(StringifiedPlan::new(
+ PhysicalPlanError,
+ err.strip_backtrace(),
+ ));
+ }
}
}
diff --git a/datafusion/physical-plan/src/explain.rs
b/datafusion/physical-plan/src/explain.rs
index 96f55a1446..cc42e05871 100644
--- a/datafusion/physical-plan/src/explain.rs
+++ b/datafusion/physical-plan/src/explain.rs
@@ -132,7 +132,6 @@ impl ExecutionPlan for ExplainExec {
if 0 != partition {
return internal_err!("ExplainExec invalid partition {partition}");
}
-
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index b68c47c57e..d6fa129edc 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -655,6 +655,7 @@ message PlanType {
datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
+ datafusion_common.EmptyMessage PhysicalPlanError = 13;
}
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index e54edb7188..16f14d9ddf 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -16683,6 +16683,9 @@ impl serde::Serialize for PlanType {
plan_type::PlanTypeEnum::FinalPhysicalPlanWithSchema(v) => {
struct_ser.serialize_field("FinalPhysicalPlanWithSchema",
v)?;
}
+ plan_type::PlanTypeEnum::PhysicalPlanError(v) => {
+ struct_ser.serialize_field("PhysicalPlanError", v)?;
+ }
}
}
struct_ser.end()
@@ -16707,6 +16710,7 @@ impl<'de> serde::Deserialize<'de> for PlanType {
"FinalPhysicalPlan",
"FinalPhysicalPlanWithStats",
"FinalPhysicalPlanWithSchema",
+ "PhysicalPlanError",
];
#[allow(clippy::enum_variant_names)]
@@ -16723,6 +16727,7 @@ impl<'de> serde::Deserialize<'de> for PlanType {
FinalPhysicalPlan,
FinalPhysicalPlanWithStats,
FinalPhysicalPlanWithSchema,
+ PhysicalPlanError,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -16756,6 +16761,7 @@ impl<'de> serde::Deserialize<'de> for PlanType {
"FinalPhysicalPlan" =>
Ok(GeneratedField::FinalPhysicalPlan),
"FinalPhysicalPlanWithStats" =>
Ok(GeneratedField::FinalPhysicalPlanWithStats),
"FinalPhysicalPlanWithSchema" =>
Ok(GeneratedField::FinalPhysicalPlanWithSchema),
+ "PhysicalPlanError" =>
Ok(GeneratedField::PhysicalPlanError),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -16860,6 +16866,13 @@ impl<'de> serde::Deserialize<'de> for PlanType {
return
Err(serde::de::Error::duplicate_field("FinalPhysicalPlanWithSchema"));
}
plan_type_enum__ =
map_.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::FinalPhysicalPlanWithSchema)
+;
+ }
+ GeneratedField::PhysicalPlanError => {
+ if plan_type_enum__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("PhysicalPlanError"));
+ }
+ plan_type_enum__ =
map_.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::PhysicalPlanError)
;
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index dfc30e8091..59a90eb31a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -888,7 +888,7 @@ pub struct OptimizedPhysicalPlanType {
pub struct PlanType {
#[prost(
oneof = "plan_type::PlanTypeEnum",
- tags = "1, 7, 8, 2, 3, 4, 9, 11, 5, 6, 10, 12"
+ tags = "1, 7, 8, 2, 3, 4, 9, 11, 5, 6, 10, 12, 13"
)]
pub plan_type_enum: ::core::option::Option<plan_type::PlanTypeEnum>,
}
@@ -920,6 +920,8 @@ pub mod plan_type {
FinalPhysicalPlanWithStats(super::super::datafusion_common::EmptyMessage),
#[prost(message, tag = "12")]
FinalPhysicalPlanWithSchema(super::super::datafusion_common::EmptyMessage),
+ #[prost(message, tag = "13")]
+ PhysicalPlanError(super::super::datafusion_common::EmptyMessage),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index f25fb0bf25..33b7185588 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -44,7 +44,7 @@ use crate::protobuf::{
AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
InitialPhysicalPlan, InitialPhysicalPlanWithStats,
OptimizedLogicalPlan,
- OptimizedPhysicalPlan,
+ OptimizedPhysicalPlan, PhysicalPlanError,
},
AnalyzedLogicalPlanType, CubeNode, GroupingSetNode,
OptimizedLogicalPlanType,
OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
@@ -141,6 +141,7 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
FinalPhysicalPlanWithStats(_) =>
PlanType::FinalPhysicalPlanWithStats,
FinalPhysicalPlanWithSchema(_) =>
PlanType::FinalPhysicalPlanWithSchema,
+ PhysicalPlanError(_) => PlanType::PhysicalPlanError,
},
plan: Arc::new(stringified_plan.plan.clone()),
}
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 8af7b19d90..a5497b2c15 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -38,6 +38,7 @@ use crate::protobuf::{
FinalPhysicalPlan, FinalPhysicalPlanWithSchema,
FinalPhysicalPlanWithStats,
InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
OptimizedPhysicalPlan,
+ PhysicalPlanError,
},
AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode,
LogicalExprList,
OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode,
RollupNode,
@@ -115,6 +116,9 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
PlanType::FinalPhysicalPlanWithSchema =>
Some(protobuf::PlanType {
plan_type_enum:
Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
}),
+ PlanType::PhysicalPlanError => Some(protobuf::PlanType {
+ plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
+ }),
},
plan: stringified_plan.plan.to_string(),
}
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 54658f36ca..f3fee4f1fc 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -411,3 +411,28 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[{c0:1,c1:2.3,c2:abc} as
struct(Int64(1),Float64(2.3),Utf8("abc"))]
02)--PlaceholderRowExec
+
+
+statement ok
+create table t1(a int);
+
+statement ok
+create table t2(b int);
+
+query TT
+explain select a from t1 where exists (select count(*) from t2);
+----
+logical_plan
+01)Filter: EXISTS (<subquery>)
+02)--Subquery:
+03)----Projection: count(*)
+04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+05)--------TableScan: t2
+06)--TableScan: t1 projection=[a]
+physical_plan_error This feature is not implemented: Physical plan does not
support logical expression Exists(Exists { subquery: <subquery>, negated: false
})
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index daf2701908..4b90ddf2ea 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4082,6 +4082,7 @@ logical_plan
02)--Projection: multiple_ordered_table_with_pk.a,
multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c,
multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]],
aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
+physical_plan_error This feature is not implemented: Physical plan does not
support DistributeBy partitioning
# union with aggregate
query TT
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 93bb1f1f54..d45dbc7ee1 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4053,7 +4053,7 @@ query TT
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select *
from unnest(generate_series(1, t1_int))) as series(i);
----
logical_plan
-01)Cross Join:
+01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
@@ -4062,6 +4062,7 @@ logical_plan
07)--------Unnest:
lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1]
structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int)
AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))
09)------------EmptyRelation
+physical_plan_error This feature is not implemented: Physical plan does not
support logical expression OuterReferenceColumn(UInt32, Column { relation:
Some(Bare { table: "t1" }), name: "t1_int" })
# Test CROSS JOIN LATERAL syntax (execution)
@@ -4084,6 +4085,7 @@ logical_plan
07)--------Unnest:
lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1]
structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int)
AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))
09)------------EmptyRelation
+physical_plan_error This feature is not implemented: Physical plan does not
support logical expression OuterReferenceColumn(UInt32, Column { relation:
Some(Bare { table: "t2" }), name: "t1_int" })
# Test INNER JOIN LATERAL syntax (execution)
diff --git a/datafusion/sqllogictest/test_files/prepare.slt
b/datafusion/sqllogictest/test_files/prepare.slt
index e306ec7767..91b925efa2 100644
--- a/datafusion/sqllogictest/test_files/prepare.slt
+++ b/datafusion/sqllogictest/test_files/prepare.slt
@@ -86,11 +86,13 @@ query TT
EXPLAIN EXECUTE my_plan;
----
logical_plan Execute: my_plan params=[]
+physical_plan_error This feature is not implemented: Unsupported logical plan:
Execute
query TT
EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
----
logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]
+physical_plan_error This feature is not implemented: Unsupported logical plan:
Execute
query error DataFusion error: Schema error: No field named a\.
EXPLAIN EXECUTE my_plan(a);
diff --git a/datafusion/sqllogictest/test_files/update.slt
b/datafusion/sqllogictest/test_files/update.slt
index aaba6998ee..0f9582b04c 100644
--- a/datafusion/sqllogictest/test_files/update.slt
+++ b/datafusion/sqllogictest/test_files/update.slt
@@ -33,6 +33,7 @@ logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b,
Float64(3) AS c, CAST(NULL AS Int32) AS d
03)----TableScan: t1
+physical_plan_error This feature is not implemented: Unsupported logical plan:
Dml(Update)
query TT
explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
@@ -41,6 +42,7 @@ logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a,
CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
03)----TableScan: t1
+physical_plan_error This feature is not implemented: Unsupported logical plan:
Dml(Update)
statement ok
create table t2(a int, b varchar, c double, d int);
@@ -58,6 +60,7 @@ logical_plan
06)----------Filter: outer_ref(t1.a) = t2.a
07)------------TableScan: t2
08)----TableScan: t1
+physical_plan_error This feature is not implemented: Physical plan does not
support logical expression ScalarSubquery(<subquery>)
# set from other table
query TT
@@ -67,9 +70,10 @@ logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c,
CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
-04)------Cross Join:
+04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
+physical_plan_error This feature is not implemented: Unsupported logical plan:
Dml(Update)
statement ok
create table t3(a int, b varchar, c double, d int);
@@ -86,7 +90,8 @@ logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1)
AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
-04)------Cross Join:
+04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
+physical_plan_error This feature is not implemented: Unsupported logical plan:
Dml(Update)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]