[ 
https://issues.apache.org/jira/browse/SPARK-49192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rommel Yuan updated SPARK-49192:
--------------------------------
    Description: 
I want to setup a logicalplan rule to do the table name replacement by giving 
the list of table names, and if the table name is a match, replace it with 
something else.

here is the code 
{code:java}
case class RenameTableRule(refs: List[NamedQueryRef]) extends Rule[LogicalPlan] 
{
  def apply(plan: LogicalPlan): LogicalPlan = {
    // println(plan.getClass.getName)
    val transformedPlan = plan transform {
      case unresolvedRelation: UnresolvedRelation =>
        val tblSchemaName = unresolvedRelation.tableName.split("\\.")
        if (tblSchemaName.length == 1) return plan

        val schema = tblSchemaName.apply(0)
        val tblName = tblSchemaName.apply(1)
        for (ref <- this.refs) {
          if (schema == "XXX" && tblName == ref.nqName) return 
unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema, 
ref.materializedInTableName), unresolvedRelation.options, 
unresolvedRelation.isStreaming)
        }
        unresolvedRelation

      case unresolvedWith: UnresolvedWith =>
        val newCteRelations = unresolvedWith.cteRelations.map {
          case (aliasName, subqueryAlias) =>
            val newSubqueryAlias = 
apply(subqueryAlias).asInstanceOf[SubqueryAlias]
            (aliasName, newSubqueryAlias)
        }          
        val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations = 
newCteRelations)
        printf("Modified CTE UnresolvedWith: %s", 
modifiedCTEUnresolvedWith.toString())
        modifiedCTEUnresolvedWith

      case otherPlan: LogicalPlan =>
        val newChildren = otherPlan.children.map(child => apply(child))
        val modified_plan = otherPlan.withNewChildren(newChildren)

        modified_plan
    }

    // Print the entire transformed logical plan
    printf("Transformed Logical Plan: %s", transformedPlan.toString())
    transformedPlan
  }
} {code}
and i register my rule and execute 
{code:java}
val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
rules += RenameTableRule(refs)    // Create RuleExecutor with the provided rules
val optimizer = new RuleExecutor[LogicalPlan] {
   val batches = Seq(
     Batch("NameChange", Once, rules.toList: _*)
   )
}    
val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
printf("Before Rule Master Plan: %s", logicalPlan.toString())
val logicalPlanRewrite = optimizer.execute(logicalPlan)
printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}
with code snippet above, when i execute the sql below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id 

The logical plan is equivalent to the logical plan of the SQL below
SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
That is the correct behavior

 
But with CTE, it is not the correct. The SQL is below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
WITH x AS (
SELECT 
  id, 
  country 
FROM XXX.abc
),
y AS (
SELECT 
  id, 
  gender FROM XXX.def
)
SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
 after the rule execution, the logical plan doesn't change the table name. Here 
is the logical of the before and master
 
{code:java}
Before Rule Master Plan: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [XXX, abc], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false{code}
{code:java}
After Rule Master Plan: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [XXX abc], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false {code}
and from the print out, i do see 
{code:java}
Modified CTE UnresolvedWith: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [schema1, t1], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [schema2, t2], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false{code}
which means the CTE case is executed, but not carried over by the RuleExecutor
 
I think in the rule executor, it only replaces any logicalplan's 
{*}children{*}, but since CTE is not logicalplan's children, it didn't get 
replaced.
 

  was:
I want to setup a logicalplan rule to do the table name replacement by giving 
the list of table names, and if the table name is a match, replace it with 
something else.

here is the code 
{code:java}
case class RenameTableRule(refs: List[NamedQueryRef]) extends Rule[LogicalPlan] 
{
  def apply(plan: LogicalPlan): LogicalPlan = {
    // println(plan.getClass.getName)
    val transformedPlan = plan transform {
      case unresolvedRelation: UnresolvedRelation =>
        val tblSchemaName = unresolvedRelation.tableName.split("\\.")
        if (tblSchemaName.length == 1) return plan

        val schema = tblSchemaName.apply(0)
        val tblName = tblSchemaName.apply(1)
        for (ref <- this.refs) {
          if (schema == "XXX" && tblName == ref.nqName) return 
unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema, 
ref.materializedInTableName), unresolvedRelation.options, 
unresolvedRelation.isStreaming)
        }
        unresolvedRelation

      case unresolvedWith: UnresolvedWith =>
        val newCteRelations = unresolvedWith.cteRelations.map {
          case (aliasName, subqueryAlias) =>
            val newSubqueryAlias = 
apply(subqueryAlias).asInstanceOf[SubqueryAlias]
            (aliasName, newSubqueryAlias)
        }          
        val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations = 
newCteRelations)
        printf("Modified CTE UnresolvedWith: %s", 
modifiedCTEUnresolvedWith.toString())
        modifiedCTEUnresolvedWith

      case otherPlan: LogicalPlan =>
        val newChildren = otherPlan.children.map(child => apply(child))
        val modified_plan = otherPlan.withNewChildren(newChildren)

        modified_plan
    }

    // Print the entire transformed logical plan
    printf("Transformed Logical Plan: %s", transformedPlan.toString())
    transformedPlan
  }
} {code}

and i register my rule and execute 
{code:java}
 val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
    rules += RenameTableRule(refs)    // Create RuleExecutor with the provided 
rules
val optimizer = new RuleExecutor[LogicalPlan] {
     val batches = Seq(
       Batch("NameChange", Once, rules.toList: _*)
     )
   }    
val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
printf("Before Rule Master Plan: %s", logicalPlan.toString())
val logicalPlanRewrite = optimizer.execute(logicalPlan)
printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}

with code snippet above, when i execute the sql below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id 

The logical plan is equivalent to the logical plan of the SQL below
SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
That is the correct behavior

 
But with CTE, it is not the correct. The SQL is below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
WITH x AS (
SELECT 
  id, 
  country 
FROM XXX.abc
),
y AS (
SELECT 
  id, 
  gender FROM XXX.def
)
SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
 after the rule execution, the logical plan doesn't change the table name. Here 
is the logical of the before and master
 
{code:java}
Before Rule Master Plan: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [XXX, abc], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false{code}
{code:java}
After Rule Master Plan: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [XXX abc], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false {code}
and from the print out, i do see 
{code:java}
Modified CTE UnresolvedWith: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [schema1, t1], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [schema2, t2], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false{code}
which means the CTE case is executed, but not carried over by the RuleExecutor
 
I think in the rule executor, it only replaces any logicalplan's children, but 
since CTE is not logicalplan's children, it didn't get replaced
 


> RuleExecutor doesn't work on CTE UnresolvedWith
> -----------------------------------------------
>
>                 Key: SPARK-49192
>                 URL: https://issues.apache.org/jira/browse/SPARK-49192
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.3.1, 3.4.1
>            Reporter: Rommel Yuan
>            Priority: Major
>
> I want to setup a logicalplan rule to do the table name replacement by giving 
> the list of table names, and if the table name is a match, replace it with 
> something else.
> here is the code 
> {code:java}
> case class RenameTableRule(refs: List[NamedQueryRef]) extends 
> Rule[LogicalPlan] {
>   def apply(plan: LogicalPlan): LogicalPlan = {
>     // println(plan.getClass.getName)
>     val transformedPlan = plan transform {
>       case unresolvedRelation: UnresolvedRelation =>
>         val tblSchemaName = unresolvedRelation.tableName.split("\\.")
>         if (tblSchemaName.length == 1) return plan
>         val schema = tblSchemaName.apply(0)
>         val tblName = tblSchemaName.apply(1)
>         for (ref <- this.refs) {
>           if (schema == "XXX" && tblName == ref.nqName) return 
> unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema, 
> ref.materializedInTableName), unresolvedRelation.options, 
> unresolvedRelation.isStreaming)
>         }
>         unresolvedRelation
>       case unresolvedWith: UnresolvedWith =>
>         val newCteRelations = unresolvedWith.cteRelations.map {
>           case (aliasName, subqueryAlias) =>
>             val newSubqueryAlias = 
> apply(subqueryAlias).asInstanceOf[SubqueryAlias]
>             (aliasName, newSubqueryAlias)
>         }          
>         val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations = 
> newCteRelations)
>         printf("Modified CTE UnresolvedWith: %s", 
> modifiedCTEUnresolvedWith.toString())
>         modifiedCTEUnresolvedWith
>       case otherPlan: LogicalPlan =>
>         val newChildren = otherPlan.children.map(child => apply(child))
>         val modified_plan = otherPlan.withNewChildren(newChildren)
>         modified_plan
>     }
>     // Print the entire transformed logical plan
>     printf("Transformed Logical Plan: %s", transformedPlan.toString())
>     transformedPlan
>   }
> } {code}
> and i register my rule and execute 
> {code:java}
> val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
> rules += RenameTableRule(refs)    // Create RuleExecutor with the provided 
> rules
> val optimizer = new RuleExecutor[LogicalPlan] {
>    val batches = Seq(
>      Batch("NameChange", Once, rules.toList: _*)
>    )
> }    
> val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
> printf("Before Rule Master Plan: %s", logicalPlan.toString())
> val logicalPlanRewrite = optimizer.execute(logicalPlan)
> printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}
> with code snippet above, when i execute the sql below by providing list of
> ("abc", "schema1", "t1"), ("def", "schema2", "t2")
> {code:java}
> SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id 
> The logical plan is equivalent to the logical plan of the SQL below
> SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
> That is the correct behavior
>  
> But with CTE, it is not the correct. The SQL is below by providing list of
> ("abc", "schema1", "t1"), ("def", "schema2", "t2")
> {code:java}
> WITH x AS (
> SELECT 
>   id, 
>   country 
> FROM XXX.abc
> ),
> y AS (
> SELECT 
>   id, 
>   gender FROM XXX.def
> )
> SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
>  after the rule execution, the logical plan doesn't change the table name. 
> Here is the logical of the before and master
>  
> {code:java}
> Before Rule Master Plan: CTE [x, y]
> :  :- 'SubqueryAlias x
> :  :  +- 'Project ['id, 'country]
> :  :     +- 'UnresolvedRelation [XXX, abc], [], false
> :  +- 'SubqueryAlias y
> :     +- 'Project ['id, 'gender]
> :        +- 'UnresolvedRelation [XXX, def], [], false
> +- 'Project ['x.id, 'x.country, 'y.gender]
>    +- 'Join Inner, ('x.id = 'y.id)
>       :- 'UnresolvedRelation [x], [], false
>       +- 'UnresolvedRelation [y], [], false{code}
> {code:java}
> After Rule Master Plan: CTE [x, y]
> :  :- 'SubqueryAlias x
> :  :  +- 'Project ['id, 'country]
> :  :     +- 'UnresolvedRelation [XXX abc], [], false
> :  +- 'SubqueryAlias y
> :     +- 'Project ['id, 'gender]
> :        +- 'UnresolvedRelation [XXX, def], [], false
> +- 'Project ['x.id, 'x.country, 'y.gender]
>    +- 'Join Inner, ('x.id = 'y.id)
>       :- 'UnresolvedRelation [x], [], false
>       +- 'UnresolvedRelation [y], [], false {code}
> and from the print out, i do see 
> {code:java}
> Modified CTE UnresolvedWith: CTE [x, y]
> :  :- 'SubqueryAlias x
> :  :  +- 'Project ['id, 'country]
> :  :     +- 'UnresolvedRelation [schema1, t1], [], false
> :  +- 'SubqueryAlias y
> :     +- 'Project ['id, 'gender]
> :        +- 'UnresolvedRelation [schema2, t2], [], false
> +- 'Project ['x.id, 'x.country, 'y.gender]
>    +- 'Join Inner, ('x.id = 'y.id)
>       :- 'UnresolvedRelation [x], [], false
>       +- 'UnresolvedRelation [y], [], false{code}
> which means the CTE case is executed, but not carried over by the RuleExecutor
>  
> I think in the rule executor, it only replaces any logicalplan's 
> {*}children{*}, but since CTE is not logicalplan's children, it didn't get 
> replaced.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to