[ 
https://issues.apache.org/jira/browse/SPARK-25707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Borsos updated SPARK-25707:
---------------------------------
    Attachment: UdfCachingNullableBug.scala

> Repeated caching doesn't work when using an UDF on a nullable field
> -------------------------------------------------------------------
>
>                 Key: SPARK-25707
>                 URL: https://issues.apache.org/jira/browse/SPARK-25707
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, Spark Core
>    Affects Versions: 2.3.1
>         Environment: Reproducible on local Spark executors
>            Reporter: David Borsos
>            Priority: Minor
>         Attachments: UdfCachingNullableBug.scala
>
>
> It seems like Spark doesn't manage find already cached data and triggers a 
> re-read from the source in case when all the following circumstances are 
> present:
>  # Caching a dataset that is derived from another already cached dataset
>  # Having nullable fields in the Dataset of types that are mapped to 
> primitives (Integer, Boolean, etc...)
>  # Using a UDF that takes non-nullable primitive parameters (Int, Boolean) on 
> one of these nullable fields
> In this case Spark's optimizer will create a null-check on these fields prior 
> to passing them into the UDF's code (presumably to prevent a 
> NullPointerException). The plans with the null-checks are generated when a 
> Dataset is persisted and when the optimizer is looking for fragments of a 
> query plan that can be read from cache instead of resolving the full Dataset 
> lineage.
> However; the plan fragments between the query that is currently being 
> persisted and what is in the cache do not match - the lookup version will 
> have the null-check included into the query plan twice; thus preventing the 
> optimizer from generating a plan that relies on the cached data and 
> ultimately triggering a repeated read from the original datasource.
> The reproduction sequence in pseudocode:
> {code:java}
> base = spark.someDataset(columns: ("a" String, "b" Int))
> someUdf = someUdf(x: Int)
> cached = base.select($"a", someUdf($"b")).cache()
> cachedAgain = cached.select(...).cached(){code}
> In this case I'd expect 'cachedAgain' to re-use the data from 'cached' 
> instead of re-reading 'base'.
> I'm attaching a small Scala code fragment with a more detailed reproduction. 
> Interestingly; the whole process works if I replace 'Int' with 
> 'java.lang.Integer' in the UDF parameter-list. The generated physical plans 
> are:
> {code:java}
> --- using scala.Int ---
> == Physical Plan ==
> InMemoryTableScan [a#6, (b * 2)#23]
> +- InMemoryRelation [a#6, (b * 2)#23], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [a#6, (if (isnull(b#7)) null else if (isnull(b#7)) null else 
> UDF(b#7) * 2) AS (b * 2)#23]
> +- Scan ExistingRDD[a#6,b#7]
> {code}
> {code:java}
> --- using java.lang.Integer ---
> == Physical Plan ==
> InMemoryTableScan [a#53, (b * 2)#70]
> +- InMemoryRelation [a#53, (b * 2)#70], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [a#53, (b#57 * 2) AS (b * 2)#70]
> +- InMemoryTableScan [a#53, b#57]
> +- InMemoryRelation [a#53, b#57], true, 10000, StorageLevel(disk, memory, 
> deserialized, 1 replicas)
> +- *(1) Project [a#53, UDF(b#54) AS b#57]
> +- Scan ExistingRDD[a#53,b#54]{code}
> The underlying issue seems to be that when `persist` is called; Spark will 
> effectively execute this code:
> {code:java}
> spark.sessionState.executePlan(dataset.queryExecution.analyzed).analyzed{code}
> (CacheManager line #100 and #166)
> This seems to generate the null-check on nullable fields twice, which won't 
> match with the already cached plan:
> {code:java}
> Plan cached:
> Project [a#6, if (isnull(b#7)) null else UDF(b#7) AS b#10]
> +- LogicalRDD [a#6, b#7], false
> Plan that is used in the lookup:
> Project [a#6, if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) 
> AS b#10]
> +- LogicalRDD [a#6, b#7], false{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to