[ https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441189#comment-17441189 ]
angerszhu commented on SPARK-37201: ----------------------------------- Working on this > Spark SQL reads unnecessary nested fields (filter after explode) > ---------------------------------------------------------------- > > Key: SPARK-37201 > URL: https://issues.apache.org/jira/browse/SPARK-37201 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.2.0 > Reporter: Sergey Kotlov > Priority: Major > > In this example, reading unnecessary nested fields still happens. > Data preparation: > {code:java} > case class Struct(v1: String, v2: String, v3: String) > case class Event(struct: Struct, array: Seq[String]) > Seq( > Event(Struct("v1","v2","v3"), Seq("cx1", "cx2")) > ).toDF().write.mode("overwrite").saveAsTable("table") > {code} > v2 and v3 columns aren't needed here, but still exist in the physical plan. > {code:java} > spark.table("table") > .select($"struct.v1", explode($"array").as("el")) > .filter($"el" === "cx1") > .explain(true) > > == Physical Plan == > ... ReadSchema: > struct<struct:struct<v1:string,v2:string,v3:string>,array:array<string>> > {code} > If you just remove _filter_ or move _explode_ to second _select_, everything > is fine: > {code:java} > spark.table("table") > .select($"struct.v1", explode($"array").as("el")) > //.filter($"el" === "cx1") > .explain(true) > > // ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>> > spark.table("table") > .select($"struct.v1", $"array") > .select($"v1", explode($"array").as("el")) > .filter($"el" === "cx1") > .explain(true) > > // ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>> > {code} > > *Yet another example: left_anti join after double select:* > {code:java} > case class Struct(v1: String, v2: String, v3: String) > case class Event(struct: Struct, field1: String, field2: String) > Seq( > Event(Struct("v1","v2","v3"), "fld1", "fld2") > ).toDF().write.mode("overwrite").saveAsTable("table") > val joinDf = Seq("id1").toDF("id") > spark.table("table") > .select("struct", "field1") > .select($"struct.v1", $"field1") > .join(joinDf, $"field1" === joinDf("id"), "left_anti") > .explain(true) > // ===> ReadSchema: > struct<struct:struct<v1:string,v2:string,v3:string>,field1:string> > {code} > Instead of the first select, it can be other types of manipulations with the > original df, for example {color:#00875a}.withColumn("field3", > lit("f3")){color} or {color:#00875a}.drop("field2"){color}, which will also > lead to reading unnecessary nested fields from _struct_. > But if you just remove the first select or change type of join, reading > nested fields will be correct: > {code:java} > // .select("struct", "field1") > ===> ReadSchema: struct<struct:struct<v1:string>,field1:string> > .join(joinDf, $"field1" === joinDf("id"), "left") > ===> ReadSchema: struct<struct:struct<v1:string>,field1:string> > {code} > PS: The first select might look strange in the context of this example, but > in a real system, it might be part of a common api, that other parts of the > system use with their own expressions on top of this api. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org