Rommel Yuan created SPARK-49192:
-----------------------------------
Summary: RuleExecutor doesn't work on CTE UnresolvedWith
Key: SPARK-49192
URL: https://issues.apache.org/jira/browse/SPARK-49192
Project: Spark
Issue Type: Bug
Components: Spark Core, SQL
Affects Versions: 3.4.1, 3.3.1
Reporter: Rommel Yuan
I want to setup a logicalplan rule to do the table name replacement by giving
the list of table names, and if the table name is a match, replace it with
something else.
here is the code
{code:java}
case class RenameTableRule(refs: List[NamedQueryRef]) extends Rule[LogicalPlan]
{
def apply(plan: LogicalPlan): LogicalPlan = {
// println(plan.getClass.getName)
val transformedPlan = plan transform {
case unresolvedRelation: UnresolvedRelation =>
val tblSchemaName = unresolvedRelation.tableName.split("\\.")
if (tblSchemaName.length == 1) return plan
val schema = tblSchemaName.apply(0)
val tblName = tblSchemaName.apply(1)
for (ref <- this.refs) {
if (schema == "XXX" && tblName == ref.nqName) return
unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema,
ref.materializedInTableName), unresolvedRelation.options,
unresolvedRelation.isStreaming)
}
unresolvedRelation
case unresolvedWith: UnresolvedWith =>
val newCteRelations = unresolvedWith.cteRelations.map {
case (aliasName, subqueryAlias) =>
val newSubqueryAlias =
apply(subqueryAlias).asInstanceOf[SubqueryAlias]
(aliasName, newSubqueryAlias)
}
val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations =
newCteRelations)
printf("Modified CTE UnresolvedWith: %s",
modifiedCTEUnresolvedWith.toString())
modifiedCTEUnresolvedWith
case otherPlan: LogicalPlan =>
val newChildren = otherPlan.children.map(child => apply(child))
val modified_plan = otherPlan.withNewChildren(newChildren)
modified_plan
}
// Print the entire transformed logical plan
printf("Transformed Logical Plan: %s", transformedPlan.toString())
transformedPlan
}
} {code}
and i register my rule and execute
{code:java}
val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
rules += RenameTableRule(refs) // Create RuleExecutor with the provided
rules
val optimizer = new RuleExecutor[LogicalPlan] {
val batches = Seq(
Batch("NameChange", Once, rules.toList: _*)
)
}
val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
printf("Before Rule Master Plan: %s", logicalPlan.toString())
val logicalPlanRewrite = optimizer.execute(logicalPlan)
printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}
with code snippet above, when i execute the sql below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id
The logical plan is equivalent to the logical plan of the SQL below
SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
That is the correct behavior
But with CTE, it is not the correct. The SQL is below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
WITH x AS (
SELECT
id,
country
FROM XXX.abc
),
y AS (
SELECT
id,
gender FROM XXX.def
)
SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
after the rule execution, the logical plan doesn't change the table name. Here
is the logical of the before and master
{code:java}
Before Rule Master Plan: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [XXX, abc], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false{code}
{code:java}
After Rule Master Plan: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [XXX abc], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false {code}
and from the print out, i do see
{code:java}
Modified CTE UnresolvedWith: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [schema1, t1], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [schema2, t2], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false{code}
which means the CTE case is executed, but not carried over by the RuleExecutor
I think in the rule executor, it only replaces any logicalplan's children, but
since CTE is not logicalplan's children, it didn't get replaced
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]