[ 
https://issues.apache.org/jira/browse/SPARK-28090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Iskender Unlu updated SPARK-28090:
----------------------------------
    Comment: was deleted

(was:  I will work on this issue.)

> Spark hangs when an execution plan has many projections on nested structs
> -------------------------------------------------------------------------
>
>                 Key: SPARK-28090
>                 URL: https://issues.apache.org/jira/browse/SPARK-28090
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.4.3
>         Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>            Reporter: Ruslan Yushchenko
>            Priority: Major
>
> This was already posted (#28016), but the provided example didn't always 
> reproduce the error. This example consistently reproduces the issue.
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> The code listed below demonstrates the issue.
> To reproduce the Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * Several nested transformations (negation of a number) are applied on 
> struct fields and each time a new struct field is created. 
>  * Once more than 9 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan.
>  * You can control the freezing by choosing different upper bound for the 
> Range. E.g. it will work file if the upper bound is 5, but will hang is the 
> bound is 10.
> {code:java}
> package com.example
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types.{StructField, StructType}
> import scala.collection.mutable.ListBuffer
> object SparkApp1IssueSelfContained {
>   // A sample data for a dataframe with nested structs
>   val sample: List[String] =
>     """ { "numerics": {"num1": 101, "num2": 102, "num3": 103, "num4": 104, 
> "num5": 105, "num6": 106, "num7": 107, "num8": 108, "num9": 109, "num10": 
> 110, "num11": 111, "num12": 112, "num13": 113, "num14": 114, "num15": 115} } 
> """ ::
>     """ { "numerics": {"num1": 201, "num2": 202, "num3": 203, "num4": 204, 
> "num5": 205, "num6": 206, "num7": 207, "num8": 208, "num9": 209, "num10": 
> 210, "num11": 211, "num12": 212, "num13": 213, "num14": 214, "num15": 215} } 
> """ ::
>     """ { "numerics": {"num1": 301, "num2": 302, "num3": 303, "num4": 304, 
> "num5": 305, "num6": 306, "num7": 307, "num8": 308, "num9": 309, "num10": 
> 310, "num11": 311, "num12": 312, "num13": 313, "num14": 314, "num15": 315} } 
> """ ::
>     Nil
>   /**
>     * Transforms a column inside a nested struct. The transformed value will 
> be put into a new field of that nested struct
>     *
>     * The output column name can omit the full path as the field will be 
> created at the same level of nesting as the input column.
>     *
>     * @param inputColumnName  A column name for which to apply the 
> transformation, e.g. `company.employee.firstName`.
>     * @param outputColumnName The output column name. The path is optional, 
> e.g. you can use `transformedName` instead of 
> `company.employee.transformedName`.
>     * @param expression       A function that applies a transformation to a 
> column as a Spark expression.
>     * @return A dataframe with a new field that contains transformed values.
>     */
>   def transformInsideNestedStruct(df: DataFrame,
>                                   inputColumnName: String,
>                                   outputColumnName: String,
>                                   expression: Column => Column): DataFrame = {
>     def mapStruct(schema: StructType, path: Seq[String], parentColumn: 
> Option[Column] = None): Seq[Column] = {
>       val mappedFields = new ListBuffer[Column]()
>       def handleMatchedLeaf(field: StructField, curColumn: Column): 
> Seq[Column] = {
>         val newColumn = expression(curColumn).as(outputColumnName)
>         mappedFields += newColumn
>         Seq(curColumn)
>       }
>       def handleMatchedNonLeaf(field: StructField, curColumn: Column): 
> Seq[Column] = {
>         // Non-leaf columns need to be further processed recursively
>         field.dataType match {
>           case dt: StructType => Seq(struct(mapStruct(dt, path.tail, 
> Some(curColumn)): _*).as(field.name))
>           case _ => throw new IllegalArgumentException(s"Field 
> '${field.name}' is not a struct type.")
>         }
>       }
>       val fieldName = path.head
>       val isLeaf = path.lengthCompare(2) < 0
>       val newColumns = schema.fields.flatMap(field => {
>         // This is the original column (struct field) we want to process
>         val curColumn = parentColumn match {
>           case None => new Column(field.name)
>           case Some(col) => col.getField(field.name).as(field.name)
>         }
>         if (field.name.compareToIgnoreCase(fieldName) != 0) {
>           // Copy unrelated fields as they were
>           Seq(curColumn)
>         } else {
>           // We have found a match
>           if (isLeaf) {
>             handleMatchedLeaf(field, curColumn)
>           } else {
>             handleMatchedNonLeaf(field, curColumn)
>           }
>         }
>       })
>       newColumns ++ mappedFields
>     }
>     val schema = df.schema
>     val path = inputColumnName.split('.')
>     df.select(mapStruct(schema, path): _*)
>   }
>   /**
>     * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
>     * involving nested structs in an execution plan.
>     *
>     * The example works as follows:
>     * - A small dataframe is created from a JSON example above
>     * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
>     *   a new struct field.
>     * - Once more than 9 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
>     *   the execution plan
>     *
>     */
>   def main(args: Array[String]): Unit = {
>     val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue Example")
>     val spark = sparkBuilder
>       .master("local[4]")
>       .getOrCreate()
>     import spark.implicits._
>     val dfInput = spark.read.json(sample.toDS)
>     // Apply several negations. You can change the number of negations by 
> changing the upper bound of the range up to 16
>     val dfOutput = Range(1,12).foldLeft(dfInput)( (df, i) => {
>       transformInsideNestedStruct(df, s"numerics.num$i", s"out_num$i", c => 
> -c)
>     })
>     dfOutput.printSchema()
>     dfOutput.explain(true)
>     dfOutput.show(false)
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to