Michael Styles created SPARK-20413:
--------------------------------------

             Summary: New Optimizer Hint to prevent collapsing of adjacent 
projections
                 Key: SPARK-20413
                 URL: https://issues.apache.org/jira/browse/SPARK-20413
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer, PySpark, SQL
    Affects Versions: 2.1.0
            Reporter: Michael Styles


I am proposing that a new optimizer hint called NO_COLLAPSE be introduced. This 
hint is essentially identical to Oracle's NO_MERGE hint. 

Let me first give an example of why I am proposing this. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = df1.withColumn("ua", user_agent_details(df1["user_agent"])) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), 
df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#85[device_form_factor] AS c1#90, ua#85[browser_version] AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#85.device_form_factor AS c1#90, ua#85.browser_version AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Optimized Logical Plan == 
Project [UDF(user_agent#81).device_form_factor AS c1#90, 
UDF(user_agent#81).browser_version AS c2#91] 
+- LogicalRDD [id#80L, user_agent#81] 

== Physical Plan == 
*Project [UDF(user_agent#81).device_form_factor AS c1#90, 
UDF(user_agent#81).browser_version AS c2#91] 
+- Scan ExistingRDD[id#80L,user_agent#81] 

user_agent_details is a user-defined function that returns a struct. As can be 
seen from the generated query plan, the function is being executed multiple 
times which could lead to performance issues. This is due to the 
CollapseProject optimizer rule that collapses adjacent projections. 

I'm proposing a hint that prevent the optimizer from collapsing adjacent 
projections. A new function called 'no_collapse' would be introduced for this 
purpose. Consider the following example and generated query plan. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = F.no_collapse(df1.withColumn("ua", 
user_agent_details(df1["user_agent"]))) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), 
df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#69[device_form_factor] AS c1#75, ua#69[browser_version] AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Optimized Logical Plan == 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Physical Plan == 
*Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- *Project [UDF(user_agent#65) AS ua#69] 
   +- Scan ExistingRDD[id#64L,user_agent#65] 

As can be seen from the query plan, the user-defined function is now evaluated 
once per row.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to