[ 
https://issues.apache.org/jira/browse/SPARK-26680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16748277#comment-16748277
 ] 

Bruce Robbins commented on SPARK-26680:
---------------------------------------

I will make a PR for this, but I would like to hear any suggested solutions 
beyond the two that I proposed above.

> StackOverflowError if Stream passed to groupBy
> ----------------------------------------------
>
>                 Key: SPARK-26680
>                 URL: https://issues.apache.org/jira/browse/SPARK-26680
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Bruce Robbins
>            Priority: Major
>
> This Java code results in a StackOverflowError:
> {code:java}
> List<Column> groupByCols = new ArrayList<>();
> groupByCols.add(new Column("id1"));
> scala.collection.Seq<Column> groupByColsSeq =
>     JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
>         .asScala().toSeq();
> df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
> {code}
> The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy 
> results in the StackOverflowError. In fact, the error can be produced more 
> easily in spark-shell:
> {noformat}
> scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
> df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
> scala> val groupBySeq = Stream(col("id1"))
> groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = 
> Stream(id1, ?)
> scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
> java.lang.StackOverflowError
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
>   at scala.collection.immutable.Stream.drop(Stream.scala:797)
>   at scala.collection.immutable.Stream.drop(Stream.scala:204)
>   at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
>   at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
>   at scala.collection.immutable.Stream.apply(Stream.scala:204)
>   at 
> org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
>   at 
> org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
>   at scala.Option.getOrElse(Option.scala:138)
>   at 
> org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
>   at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
>   at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
>   at scala.collection.immutable.Stream.drop(Stream.scala:797)
>   at scala.collection.immutable.Stream.drop(Stream.scala:204)
>   at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
>   at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
>   at scala.collection.immutable.Stream.apply(Stream.scala:204)
>   at 
> org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
>   at 
> org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
>   at scala.Option.getOrElse(Option.scala:138)
>   at 
> org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
>   at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
> ...etc...
> {noformat}
> This is due to the lazy nature of Streams. The method {{consume}} in 
> {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
> {code:java}
> val inputVars =
>         ctx.currentVars = null <== the closure cares about this
>         ctx.INPUT_ROW = row
>         output.zipWithIndex.map { case (attr, i) =>
>           BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
> -
> -
> -
>     ctx.currentVars = inputVars
>     ctx.INPUT_ROW = null
>     ctx.freshNamePrefix = parent.variablePrefix
>     val evaluated = evaluateRequiredVariables(output, inputVars, 
> parent.usedInputs)
> {code}
> The closure passed to the map function assumes {{ctx.currentVars}} will be 
> set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to 
> something else by the time the closure is actually called. Worse yet, 
> {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The 
> closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore 
> it ends up using the data structure it is attempting to create.
> You can recreate the problem is a vanilla Scala shell:
> {code:java}
> scala> var p1: Seq[Any] = null
> p1: Seq[Any] = null
> scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) 
> p1(i) else x }
> s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
> scala> p1 = s
> p1: Seq[Any] = Stream(1, ?)
> scala> s.foreach(println)
> 1
> java.lang.StackOverflowError
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
>   at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
> ... etc ...
> {code}
> Possible fixes:
>  - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before 
> passing it to RelationalGroupedDataset (simply by changing 
> {{cols.map(_.expr)}} to {{cols.toList.map(\_.expr)}}
>  - In {{CodegenSupport}}, we could ensure that the map function is eagerly 
> evaluated (simply by adding ".toList" to the construct).
>  - Something else that hasn't occurred to me (opinions welcome).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to