Takuya Ueshin created SPARK-15313:
-------------------------------------

             Summary: EmbedSerializerInFilter rule should keep exprIds of 
output of surrounded SerializeFromObject.
                 Key: SPARK-15313
                 URL: https://issues.apache.org/jira/browse/SPARK-15313
             Project: Spark
          Issue Type: Bug
          Components: SQL
            Reporter: Takuya Ueshin


The following code:

{code}
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
ds.filter(_._1 == "b").select(expr("_1").as[String]).foreach(println(_))
{code}

throws an Exception:

{noformat}
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: _1#420
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:68)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:254)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
 at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:55)
 at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:54)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.immutable.List.map(List.scala:285)
 at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:54)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
 at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 at 
org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:218)
 at 
org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:244)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:218)
 at 
org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79)
 at 
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.DeserializeToObject.doExecute(objects.scala:59)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
 at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
 at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2285)
 at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2282)
 at 
org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2036)
 at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
 at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
 at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2435)
 at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2035)
...
 Cause: java.lang.RuntimeException: Couldn't find _1#420 in [_1#416,_2#417]
 at scala.sys.package$.error(package.scala:27)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:68)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:254)
 at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
 at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:55)
 at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:54)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.immutable.List.map(List.scala:285)
 at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:54)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
 at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 at 
org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:218)
 at 
org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:244)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:218)
 at 
org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79)
 at 
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 at 
org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.DeserializeToObject.doExecute(objects.scala:59)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
 at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
 at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2285)
 at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2282)
 at 
org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2036)
 at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
 at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
 at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2435)
 at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2035)
...
{noformat}

This is because {{EmbedSerializerInFilter}} rule drops the exprIds of output of 
surrounded {{SerializeFromObject}}.

The analyzed and optimized plans of the above example are as follows:

{noformat}
== Analyzed Logical Plan ==
_1: string
Project [_1#420]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
scala.Tuple2]._1, true) AS _1#420,input[0, scala.Tuple2]._2 AS _2#421]
   +- Filter <function1>.apply
      +- DeserializeToObject newInstance(class scala.Tuple2), obj#419: 
scala.Tuple2
         +- LocalRelation [_1#416,_2#417], 
[[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]

== Optimized Logical Plan ==
!Project [_1#420]
+- Filter <function1>.apply
   +- LocalRelation [_1#416,_2#417], 
[[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to