Rommel Yuan created SPARK-49192:
-----------------------------------

             Summary: RuleExecutor doesn't work on CTE UnresolvedWith
                 Key: SPARK-49192
                 URL: https://issues.apache.org/jira/browse/SPARK-49192
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, SQL
    Affects Versions: 3.4.1, 3.3.1
            Reporter: Rommel Yuan


I want to setup a logicalplan rule to do the table name replacement by giving 
the list of table names, and if the table name is a match, replace it with 
something else.

here is the code 
{code:java}
case class RenameTableRule(refs: List[NamedQueryRef]) extends Rule[LogicalPlan] 
{
  def apply(plan: LogicalPlan): LogicalPlan = {
    // println(plan.getClass.getName)
    val transformedPlan = plan transform {
      case unresolvedRelation: UnresolvedRelation =>
        val tblSchemaName = unresolvedRelation.tableName.split("\\.")
        if (tblSchemaName.length == 1) return plan

        val schema = tblSchemaName.apply(0)
        val tblName = tblSchemaName.apply(1)
        for (ref <- this.refs) {
          if (schema == "XXX" && tblName == ref.nqName) return 
unresolvedRelation.copy(multipartIdentifier = Seq(ref.materializedInSchema, 
ref.materializedInTableName), unresolvedRelation.options, 
unresolvedRelation.isStreaming)
        }
        unresolvedRelation

      case unresolvedWith: UnresolvedWith =>
        val newCteRelations = unresolvedWith.cteRelations.map {
          case (aliasName, subqueryAlias) =>
            val newSubqueryAlias = 
apply(subqueryAlias).asInstanceOf[SubqueryAlias]
            (aliasName, newSubqueryAlias)
        }          
        val modifiedCTEUnresolvedWith = unresolvedWith.copy(cteRelations = 
newCteRelations)
        printf("Modified CTE UnresolvedWith: %s", 
modifiedCTEUnresolvedWith.toString())
        modifiedCTEUnresolvedWith

      case otherPlan: LogicalPlan =>
        val newChildren = otherPlan.children.map(child => apply(child))
        val modified_plan = otherPlan.withNewChildren(newChildren)

        modified_plan
    }

    // Print the entire transformed logical plan
    printf("Transformed Logical Plan: %s", transformedPlan.toString())
    transformedPlan
  }
} {code}

and i register my rule and execute 
{code:java}
 val rules: ListBuffer[Rule[LogicalPlan]] = ListBuffer.empty
    rules += RenameTableRule(refs)    // Create RuleExecutor with the provided 
rules
val optimizer = new RuleExecutor[LogicalPlan] {
     val batches = Seq(
       Batch("NameChange", Once, rules.toList: _*)
     )
   }    
val logicalPlan = CatalystSqlParser.parsePlan(this.sqlQuery)
printf("Before Rule Master Plan: %s", logicalPlan.toString())
val logicalPlanRewrite = optimizer.execute(logicalPlan)
printf("After Rule Master Plan: %s", logicalPlanRewrite.toString()){code}

with code snippet above, when i execute the sql below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
SELECT * FROM XXX.abc a JOIN XXX.def b ON a.id = b.id 

The logical plan is equivalent to the logical plan of the SQL below
SELECT * FROM schema1.t1 a JOIN schema2.t2 b ON a.id = b.id{code}
That is the correct behavior

 
But with CTE, it is not the correct. The SQL is below by providing list of
("abc", "schema1", "t1"), ("def", "schema2", "t2")
{code:java}
WITH x AS (
SELECT 
  id, 
  country 
FROM XXX.abc
),
y AS (
SELECT 
  id, 
  gender FROM XXX.def
)
SELECT x.id, x.country, y.gender FROM x JOIN y ON x.id = y.id{code}
 after the rule execution, the logical plan doesn't change the table name. Here 
is the logical of the before and master
 
{code:java}
Before Rule Master Plan: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [XXX, abc], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false{code}
{code:java}
After Rule Master Plan: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [XXX abc], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [XXX, def], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false {code}
and from the print out, i do see 
{code:java}
Modified CTE UnresolvedWith: CTE [x, y]
:  :- 'SubqueryAlias x
:  :  +- 'Project ['id, 'country]
:  :     +- 'UnresolvedRelation [schema1, t1], [], false
:  +- 'SubqueryAlias y
:     +- 'Project ['id, 'gender]
:        +- 'UnresolvedRelation [schema2, t2], [], false
+- 'Project ['x.id, 'x.country, 'y.gender]
   +- 'Join Inner, ('x.id = 'y.id)
      :- 'UnresolvedRelation [x], [], false
      +- 'UnresolvedRelation [y], [], false{code}
which means the CTE case is executed, but not carried over by the RuleExecutor
 
I think in the rule executor, it only replaces any logicalplan's children, but 
since CTE is not logicalplan's children, it didn't get replaced
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to