[ 
https://issues.apache.org/jira/browse/SPARK-29029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikita Konda updated SPARK-29029:
---------------------------------
    Description: 
We have a specific use case where in we are trying insert a custom logical 
operator in our logical plan to avoid some of the Spark’s optimization rules. 
However, we remove this logical operator as part of custom optimization rule 
before we send this to SparkStrategies.

However, we are hitting issue in the following scenario:

Analyzed plan:
{code:java}
[1] Project [userid#0]
+- [2] SubqueryAlias tmp6
   +- [3] Project [videoid#47L, avebitrate#2, userid#0]
      +- [4] Filter NOT (videoid#47L = cast(30 as bigint))
         +- [5] SubqueryAlias tmp5
            +- [6] CustomBarrier
               +- [7] Project [videoid#47L, avebitrate#2, userid#0]
                  +- [8] Filter (avebitrate#2 < 10)
                     +- [9] SubqueryAlias tmp3
                        +- [10] Project [avebitrate#2, factorial(videoid#1) AS 
videoid#47L, userid#0]
                           +- [11] SubqueryAlias tmp2
                              +- [12] Project [userid#0, videoid#1, 
avebitrate#2]
                                 +- [13] SubqueryAlias tmp1
                                    +- [14] Project [userid#0, videoid#1, 
avebitrate#2]
                                       +- [15] SubqueryAlias views
                                          +- [16] 
Relation[userid#0,videoid#1,avebitrate#2] 
{code}
 

Optimized Plan:
{code:java}
[1] Project [userid#0]
+- [2] Filter (isnotnull(videoid#47L) && NOT (videoid#47L = 30))
   +- [3] Project [factorial(videoid#1) AS videoid#47L, userid#0]
      +- [4] Filter (isnotnull(avebitrate#2) && (avebitrate#2 < 10))
         +- [5] Relation[userid#0,videoid#1,avebitrate#2]
{code}
 
 When this plan is passed into *PhysicalOperation* in *DataSourceStrategy*, the 
collectProjectsAndFilters collects filters as 
List[[+AttributeReference("videoid#47L"), 
AttributeReference("avebitrate#2")]+|#47L), AttributeReference(avebitrate#2)]. 
However, at this stage the base relation only has videoid#1 and hence it throws 
exception saying *key not found: videoid#47L.*

 On looking further, noticed that the alias map in 
*PhysicalOperation.substitute* does have the entry with key *videoid#47L* -> 
Aliases Map((videoid#47L, factorial(videoid#1))). However, the substitute alias 
is not substituting the expression for alias videoid#47L because they differ in 
qualifier parameter.
 Attribute key in Alias: AttributeReference("videoid", LongType, nullable = 
true)(ExprId(47, _), *"None"*)
 Attribute in Filter condition: AttributeReference("videoid", LongType, 
nullable = true)(ExprId(47, _), *"Some(tmp5)"*)

Both differ only in the qualifier, however for alias map if we use AttributeMap 
instead of Map[Attribute, Expression], we can get rid of the above issue. 

  was:
We have a specific use case where in we are trying insert a custom logical 
operator in our logical plan to avoid some of the Spark’s optimization rules. 
However, we remove this logical operator as part of custom optimization rule 
before we send this to SparkStrategies.

However, we are hitting issue in the following scenario:

Analyzed plan:
{code:java}
[1] Project [userid#0]
+- [2] SubqueryAlias tmp6
   +- [3] Project [videoid#47L, avebitrate#2, userid#0]
      +- [4] Filter NOT (videoid#47L = cast(30 as bigint))
         +- [5] SubqueryAlias tmp5
            +- [6] CustomBarrier
               +- [7] Project [videoid#47L, avebitrate#2, userid#0]
                  +- [8] Filter (avebitrate#2 < 10)
                     +- [9] SubqueryAlias tmp3
                        +- [10] Project [avebitrate#2, factorial(videoid#1) AS 
videoid#47L, userid#0]
                           +- [11] SubqueryAlias tmp2
                              +- [12] Project [userid#0, videoid#1, 
avebitrate#2]
                                 +- [13] SubqueryAlias tmp1
                                    +- [14] Project [userid#0, videoid#1, 
avebitrate#2]
                                       +- [15] SubqueryAlias views
                                          +- [16] 
Relation[userid#0,videoid#1,avebitrate#2] 
{code}
 

Optimized Plan:
{code:java}
[1] Project [userid#0]
+- [2] Filter (isnotnull(videoid#47L) && NOT (videoid#47L = 30))
   +- [3] Project [factorial(videoid#1) AS videoid#47L, userid#0]
      +- [4] Filter (isnotnull(avebitrate#2) && (avebitrate#2 < 10))
         +- [5] Relation[userid#0,videoid#1,avebitrate#2]
{code}
 
 When this plan is passed into *PhysicalOperation* in *DataSourceStrategy*, the 
collectProjectsAndFilters collects filters as 
List[[AttributeReference(videoid#47L), AttributeReference(avebitrate#2)]|#47L), 
AttributeReference(avebitrate#2)]. However, at this stage the base relation 
only has videoid#1 and hence it throws exception saying *key not found: 
videoid#47L.*
 On looking further, noticed that the alias map in 
*PhysicalOperation.substituteAliases* does have the entry with key 
*videoid#47L* Map((videoid#47L, factorial(videoid#1))). However, the substitute 
alias is not substituting the expression for alias videoid#47L because they 
differ in qualifier.
 Attribute key in Alias: AttributeReference("videoid", LongType, nullable = 
true)(ExprId(47, _), "None")
 Attribute in Filter condition: AttributeReference("videoid", LongType, 
nullable = true)(ExprId(47, _), "Some(tmp5)")

Both differ only in the qualifier, however for alias map if we use AttributeMap 
instead of Map[Attribute, Expression], we can get rid of the above issue. 


> PhysicalOperation.collectProjectsAndFilters should use AttributeMap while 
> substituting aliases
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-29029
>                 URL: https://issues.apache.org/jira/browse/SPARK-29029
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.3.0
>            Reporter: Nikita Konda
>            Priority: Major
>
> We have a specific use case where in we are trying insert a custom logical 
> operator in our logical plan to avoid some of the Spark’s optimization rules. 
> However, we remove this logical operator as part of custom optimization rule 
> before we send this to SparkStrategies.
> However, we are hitting issue in the following scenario:
> Analyzed plan:
> {code:java}
> [1] Project [userid#0]
> +- [2] SubqueryAlias tmp6
>    +- [3] Project [videoid#47L, avebitrate#2, userid#0]
>       +- [4] Filter NOT (videoid#47L = cast(30 as bigint))
>          +- [5] SubqueryAlias tmp5
>             +- [6] CustomBarrier
>                +- [7] Project [videoid#47L, avebitrate#2, userid#0]
>                   +- [8] Filter (avebitrate#2 < 10)
>                      +- [9] SubqueryAlias tmp3
>                         +- [10] Project [avebitrate#2, factorial(videoid#1) 
> AS videoid#47L, userid#0]
>                            +- [11] SubqueryAlias tmp2
>                               +- [12] Project [userid#0, videoid#1, 
> avebitrate#2]
>                                  +- [13] SubqueryAlias tmp1
>                                     +- [14] Project [userid#0, videoid#1, 
> avebitrate#2]
>                                        +- [15] SubqueryAlias views
>                                           +- [16] 
> Relation[userid#0,videoid#1,avebitrate#2] 
> {code}
>  
> Optimized Plan:
> {code:java}
> [1] Project [userid#0]
> +- [2] Filter (isnotnull(videoid#47L) && NOT (videoid#47L = 30))
>    +- [3] Project [factorial(videoid#1) AS videoid#47L, userid#0]
>       +- [4] Filter (isnotnull(avebitrate#2) && (avebitrate#2 < 10))
>          +- [5] Relation[userid#0,videoid#1,avebitrate#2]
> {code}
>  
>  When this plan is passed into *PhysicalOperation* in *DataSourceStrategy*, 
> the collectProjectsAndFilters collects filters as 
> List[[+AttributeReference("videoid#47L"), 
> AttributeReference("avebitrate#2")]+|#47L), 
> AttributeReference(avebitrate#2)]. However, at this stage the base relation 
> only has videoid#1 and hence it throws exception saying *key not found: 
> videoid#47L.*
>  On looking further, noticed that the alias map in 
> *PhysicalOperation.substitute* does have the entry with key *videoid#47L* -> 
> Aliases Map((videoid#47L, factorial(videoid#1))). However, the substitute 
> alias is not substituting the expression for alias videoid#47L because they 
> differ in qualifier parameter.
>  Attribute key in Alias: AttributeReference("videoid", LongType, nullable = 
> true)(ExprId(47, _), *"None"*)
>  Attribute in Filter condition: AttributeReference("videoid", LongType, 
> nullable = true)(ExprId(47, _), *"Some(tmp5)"*)
> Both differ only in the qualifier, however for alias map if we use 
> AttributeMap instead of Map[Attribute, Expression], we can get rid of the 
> above issue. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to