[
https://issues.apache.org/jira/browse/SPARK-37855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
L. C. Hsieh resolved SPARK-37855.
---------------------------------
Resolution: Fixed
> IllegalStateException when transforming an array inside a nested struct
> -----------------------------------------------------------------------
>
> Key: SPARK-37855
> URL: https://issues.apache.org/jira/browse/SPARK-37855
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.2.0
> Environment: OS: Ubuntu 20.04.3 LTS
> Scala version: 2.12.12
>
> Reporter: G Muciaccia
> Assignee: XiDuo You
> Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> *NOTE*: this bug is only present in version {{3.2.0}}. Downgrading to
> {{3.1.2}} solves the problem.
> h3. Prerequisites to reproduce the bug
> # use Spark version 3.2.0
> # create a DataFrame with an array field, which contains a struct field with
> a nested array field
> # *apply a limit* to the DataFrame
> # transform the outer array, renaming one of its fields
> # transform the inner array too, which requires two {{getField}} in sequence
> h3. Example that reproduces the bug
> This is a minimal example (as minimal as I could make it) to reproduce the
> bug:
> {code}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{DataFrame, Row}
> def makeInput(): DataFrame = {
> val innerElement1 = Row(3, 3.12)
> val innerElement2 = Row(4, 2.1)
> val innerElement3 = Row(1, 985.2)
> val innerElement4 = Row(10, 757548.0)
> val innerElement5 = Row(1223, 0.665)
> val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
> val outerElement2 = Row(2, Row(List(innerElement3)))
> val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
> val data = Seq(
> Row("row1", List(outerElement1)),
> Row("row2", List(outerElement2, outerElement3)),
> )
> val schema = new StructType()
> .add("name", StringType)
> .add("outer_array", ArrayType(new StructType()
> .add("id", IntegerType)
> .add("inner_array_struct", new StructType()
> .add("inner_array", ArrayType(new StructType()
> .add("id", IntegerType)
> .add("value", DoubleType)
> ))
> )
> ))
> spark.createDataFrame(spark.sparkContext
> .parallelize(data),schema)
> }
> // val df = makeInput()
> val df = makeInput().limit(2)
> // val df = makeInput().limit(2).cache()
> val res = df.withColumn("extracted", transform(
> col("outer_array"),
> c1 => {
> struct(
> c1.getField("id").alias("outer_id"),
> transform(
> c1.getField("inner_array_struct").getField("inner_array"),
> c2 => {
> struct(
> c2.getField("value").alias("inner_value")
> )
> }
> )
> )
> }
> ))
> res.printSchema()
> res.show(false)
> {code}
> h4. Executing the example code
> When executing it as-is, the execution will fail on the {{show}} statement,
> with
> {code}
> java.lang.IllegalStateException Couldn't find _extract_inner_array#23 in
> [name#2,outer_array#3]
> {code}
> However, *if the limit is not applied, or if the DataFrame is cached after
> the limit, everything works* (you can uncomment the corresponding lines in
> the example to try it).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]