[
https://issues.apache.org/jira/browse/SPARK-49192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rommel Yuan updated SPARK-49192:
--------------------------------
Description:
I want to setup a logicalplan rule to do the table name replacement by giving
the list of table names, and if the table name is a match, replace it with
something else.
here is the code
{code:java}
case class RenameTableRule(refs: List[NamedQueryRef]) extends Rule[LogicalPlan]
{
def apply(plan: LogicalPlan): LogicalPlan = {
// println(plan.getClass.getName)
val transformedPlan = plan transform {
case unresolvedRelation: UnresolvedRelation =>
val tblSchemaName = unresolvedRelation.tableName.split("\\.")
if (tblSchemaName.length == 1) return plan
val schema = tblSchemaName.apply(0)
val tblName = tblSchemaName.apply(1)
for (ref <- this.refs) {
if (schema == "XXX" && tblName == ref.nqName) return
unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema,
ref.materializedInTableName), unresolvedRelation.options,
unresolvedRelation.isStreaming)
}
unresolvedRelation
case unresolvedWith: UnresolvedWith =>
val newCteRelations = unresolvedWith.cteRelations.map {
case (aliasName, subqueryAlias) =>
val newSubqueryAlias =
apply(subqueryAlias).asInstanceOf[SubqueryAlias]
(aliasName, newSubqueryAlias)
}
val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations =
newCteRelations)
printf("Modified CTE UnresolvedWith: %s",
modifiedCTEUnresolvedWith.toString())
modifiedCTEUnresolvedWith
case otherPlan: LogicalPlan =>
val newChildren = otherPlan.children.map(child => apply(child))
val modified_plan = otherPlan.withNewChildren(newChildren)
modified_plan
}
// Print the entire transformed logical plan
printf("Transformed Logical Plan: %s", transformedPlan.toString())
transformedPlan
}
} {code}
and i register my rule and execute
{code:java}
val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
rules += RenameTableRule(refs) // Create RuleExecutor with the provided rules
val optimizer = new RuleExecutor[LogicalPlan] {
val batches = Seq(
Batch("NameChange", Once, rules.toList: _*)
)
}
val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
printf("Before Rule Master Plan: %s", logicalPlan.toString())
val logicalPlanRewrite = optimizer.execute(logicalPlan)
printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}
with code snippet above, when i execute the sql below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id
The logical plan is equivalent to the logical plan of the SQL below
SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
That is the correct behavior
But with CTE, it is not the correct. The SQL is below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
WITH x AS (
SELECT
id,
country
FROM XXX.abc
),
y AS (
SELECT
id,
gender FROM XXX.def
)
SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
after the rule execution, the logical plan doesn't change the table name. Here
is the logical of the before and master
{code:java}
Before Rule Master Plan: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [XXX, abc], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false{code}
{code:java}
After Rule Master Plan: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [XXX abc], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false {code}
and from the print out, i do see
{code:java}
Modified CTE UnresolvedWith: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [schema1, t1], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [schema2, t2], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false{code}
which means the CTE case is executed, but not carried over by the RuleExecutor
I think in the rule executor, it only replaces any logicalplan's
{*}children{*}, but since CTE is not logicalplan's children, it didn't get
replaced.
was:
I want to setup a logicalplan rule to do the table name replacement by giving
the list of table names, and if the table name is a match, replace it with
something else.
here is the code
{code:java}
case class RenameTableRule(refs: List[NamedQueryRef]) extends Rule[LogicalPlan]
{
def apply(plan: LogicalPlan): LogicalPlan = {
// println(plan.getClass.getName)
val transformedPlan = plan transform {
case unresolvedRelation: UnresolvedRelation =>
val tblSchemaName = unresolvedRelation.tableName.split("\\.")
if (tblSchemaName.length == 1) return plan
val schema = tblSchemaName.apply(0)
val tblName = tblSchemaName.apply(1)
for (ref <- this.refs) {
if (schema == "XXX" && tblName == ref.nqName) return
unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema,
ref.materializedInTableName), unresolvedRelation.options,
unresolvedRelation.isStreaming)
}
unresolvedRelation
case unresolvedWith: UnresolvedWith =>
val newCteRelations = unresolvedWith.cteRelations.map {
case (aliasName, subqueryAlias) =>
val newSubqueryAlias =
apply(subqueryAlias).asInstanceOf[SubqueryAlias]
(aliasName, newSubqueryAlias)
}
val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations =
newCteRelations)
printf("Modified CTE UnresolvedWith: %s",
modifiedCTEUnresolvedWith.toString())
modifiedCTEUnresolvedWith
case otherPlan: LogicalPlan =>
val newChildren = otherPlan.children.map(child => apply(child))
val modified_plan = otherPlan.withNewChildren(newChildren)
modified_plan
}
// Print the entire transformed logical plan
printf("Transformed Logical Plan: %s", transformedPlan.toString())
transformedPlan
}
} {code}
and i register my rule and execute
{code:java}
val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
rules += RenameTableRule(refs) // Create RuleExecutor with the provided
rules
val optimizer = new RuleExecutor[LogicalPlan] {
val batches = Seq(
Batch("NameChange", Once, rules.toList: _*)
)
}
val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
printf("Before Rule Master Plan: %s", logicalPlan.toString())
val logicalPlanRewrite = optimizer.execute(logicalPlan)
printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}
with code snippet above, when i execute the sql below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id
The logical plan is equivalent to the logical plan of the SQL below
SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
That is the correct behavior
But with CTE, it is not the correct. The SQL is below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
WITH x AS (
SELECT
id,
country
FROM XXX.abc
),
y AS (
SELECT
id,
gender FROM XXX.def
)
SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
after the rule execution, the logical plan doesn't change the table name. Here
is the logical of the before and master
{code:java}
Before Rule Master Plan: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [XXX, abc], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false{code}
{code:java}
After Rule Master Plan: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [XXX abc], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false {code}
and from the print out, i do see
{code:java}
Modified CTE UnresolvedWith: CTE [x, y]
: :- 'SubqueryAlias x
: : +- 'Project ['id, 'country]
: : +- 'UnresolvedRelation [schema1, t1], [], false
: +- 'SubqueryAlias y
: +- 'Project ['id, 'gender]
: +- 'UnresolvedRelation [schema2, t2], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
+- 'Join Inner, ('x.id = 'y.id)
:- 'UnresolvedRelation [x], [], false
+- 'UnresolvedRelation [y], [], false{code}
which means the CTE case is executed, but not carried over by the RuleExecutor
I think in the rule executor, it only replaces any logicalplan's children, but
since CTE is not logicalplan's children, it didn't get replaced
> RuleExecutor doesn't work on CTE UnresolvedWith
> -----------------------------------------------
>
> Key: SPARK-49192
> URL: https://issues.apache.org/jira/browse/SPARK-49192
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, SQL
> Affects Versions: 3.3.1, 3.4.1
> Reporter: Rommel Yuan
> Priority: Major
>
> I want to setup a logicalplan rule to do the table name replacement by giving
> the list of table names, and if the table name is a match, replace it with
> something else.
> here is the code
> {code:java}
> case class RenameTableRule(refs: List[NamedQueryRef]) extends
> Rule[LogicalPlan] {
> def apply(plan: LogicalPlan): LogicalPlan = {
> // println(plan.getClass.getName)
> val transformedPlan = plan transform {
> case unresolvedRelation: UnresolvedRelation =>
> val tblSchemaName = unresolvedRelation.tableName.split("\\.")
> if (tblSchemaName.length == 1) return plan
> val schema = tblSchemaName.apply(0)
> val tblName = tblSchemaName.apply(1)
> for (ref <- this.refs) {
> if (schema == "XXX" && tblName == ref.nqName) return
> unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema,
> ref.materializedInTableName), unresolvedRelation.options,
> unresolvedRelation.isStreaming)
> }
> unresolvedRelation
> case unresolvedWith: UnresolvedWith =>
> val newCteRelations = unresolvedWith.cteRelations.map {
> case (aliasName, subqueryAlias) =>
> val newSubqueryAlias =
> apply(subqueryAlias).asInstanceOf[SubqueryAlias]
> (aliasName, newSubqueryAlias)
> }
> val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations =
> newCteRelations)
> printf("Modified CTE UnresolvedWith: %s",
> modifiedCTEUnresolvedWith.toString())
> modifiedCTEUnresolvedWith
> case otherPlan: LogicalPlan =>
> val newChildren = otherPlan.children.map(child => apply(child))
> val modified_plan = otherPlan.withNewChildren(newChildren)
> modified_plan
> }
> // Print the entire transformed logical plan
> printf("Transformed Logical Plan: %s", transformedPlan.toString())
> transformedPlan
> }
> } {code}
> and i register my rule and execute
> {code:java}
> val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
> rules += RenameTableRule(refs) // Create RuleExecutor with the provided
> rules
> val optimizer = new RuleExecutor[LogicalPlan] {
> val batches = Seq(
> Batch("NameChange", Once, rules.toList: _*)
> )
> }
> val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
> printf("Before Rule Master Plan: %s", logicalPlan.toString())
> val logicalPlanRewrite = optimizer.execute(logicalPlan)
> printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}
> with code snippet above, when i execute the sql below by providing list of
> ("abc", "schema1", "t1"), ("def", "schema2", "t2")
> {code:java}
> SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id
> The logical plan is equivalent to the logical plan of the SQL below
> SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
> That is the correct behavior
>
> But with CTE, it is not the correct. The SQL is below by providing list of
> ("abc", "schema1", "t1"), ("def", "schema2", "t2")
> {code:java}
> WITH x AS (
> SELECT
> id,
> country
> FROM XXX.abc
> ),
> y AS (
> SELECT
> id,
> gender FROM XXX.def
> )
> SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
> after the rule execution, the logical plan doesn't change the table name.
> Here is the logical of the before and master
>
> {code:java}
> Before Rule Master Plan: CTE [x, y]
> : :- 'SubqueryAlias x
> : : +- 'Project ['id, 'country]
> : : +- 'UnresolvedRelation [XXX, abc], [], false
> : +- 'SubqueryAlias y
> : +- 'Project ['id, 'gender]
> : +- 'UnresolvedRelation [XXX, def], [], false
> +- 'Project ['x.id, 'x.country, 'y.gender]
> +- 'Join Inner, ('x.id = 'y.id)
> :- 'UnresolvedRelation [x], [], false
> +- 'UnresolvedRelation [y], [], false{code}
> {code:java}
> After Rule Master Plan: CTE [x, y]
> : :- 'SubqueryAlias x
> : : +- 'Project ['id, 'country]
> : : +- 'UnresolvedRelation [XXX abc], [], false
> : +- 'SubqueryAlias y
> : +- 'Project ['id, 'gender]
> : +- 'UnresolvedRelation [XXX, def], [], false
> +- 'Project ['x.id, 'x.country, 'y.gender]
> +- 'Join Inner, ('x.id = 'y.id)
> :- 'UnresolvedRelation [x], [], false
> +- 'UnresolvedRelation [y], [], false {code}
> and from the print out, i do see
> {code:java}
> Modified CTE UnresolvedWith: CTE [x, y]
> : :- 'SubqueryAlias x
> : : +- 'Project ['id, 'country]
> : : +- 'UnresolvedRelation [schema1, t1], [], false
> : +- 'SubqueryAlias y
> : +- 'Project ['id, 'gender]
> : +- 'UnresolvedRelation [schema2, t2], [], false
> +- 'Project ['x.id, 'x.country, 'y.gender]
> +- 'Join Inner, ('x.id = 'y.id)
> :- 'UnresolvedRelation [x], [], false
> +- 'UnresolvedRelation [y], [], false{code}
> which means the CTE case is executed, but not carried over by the RuleExecutor
>
> I think in the rule executor, it only replaces any logicalplan's
> {*}children{*}, but since CTE is not logicalplan's children, it didn't get
> replaced.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]