[
https://issues.apache.org/jira/browse/SPARK-25707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Borsos updated SPARK-25707:
---------------------------------
Attachment: UdfCachingNullableBug.scala
> Repeated caching doesn't work when using an UDF on a nullable field
> -------------------------------------------------------------------
>
> Key: SPARK-25707
> URL: https://issues.apache.org/jira/browse/SPARK-25707
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, Spark Core
> Affects Versions: 2.3.1
> Environment: Reproducible on local Spark executors
> Reporter: David Borsos
> Priority: Minor
> Attachments: UdfCachingNullableBug.scala
>
>
> It seems like Spark doesn't manage find already cached data and triggers a
> re-read from the source in case when all the following circumstances are
> present:
> # Caching a dataset that is derived from another already cached dataset
> # Having nullable fields in the Dataset of types that are mapped to
> primitives (Integer, Boolean, etc...)
> # Using a UDF that takes non-nullable primitive parameters (Int, Boolean) on
> one of these nullable fields
> In this case Spark's optimizer will create a null-check on these fields prior
> to passing them into the UDF's code (presumably to prevent a
> NullPointerException). The plans with the null-checks are generated when a
> Dataset is persisted and when the optimizer is looking for fragments of a
> query plan that can be read from cache instead of resolving the full Dataset
> lineage.
> However; the plan fragments between the query that is currently being
> persisted and what is in the cache do not match - the lookup version will
> have the null-check included into the query plan twice; thus preventing the
> optimizer from generating a plan that relies on the cached data and
> ultimately triggering a repeated read from the original datasource.
> The reproduction sequence in pseudocode:
> {code:java}
> base = spark.someDataset(columns: ("a" String, "b" Int))
> someUdf = someUdf(x: Int)
> cached = base.select($"a", someUdf($"b")).cache()
> cachedAgain = cached.select(...).cached(){code}
> In this case I'd expect 'cachedAgain' to re-use the data from 'cached'
> instead of re-reading 'base'.
> I'm attaching a small Scala code fragment with a more detailed reproduction.
> Interestingly; the whole process works if I replace 'Int' with
> 'java.lang.Integer' in the UDF parameter-list. The generated physical plans
> are:
> {code:java}
> --- using scala.Int ---
> == Physical Plan ==
> InMemoryTableScan [a#6, (b * 2)#23]
> +- InMemoryRelation [a#6, (b * 2)#23], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- *(1) Project [a#6, (if (isnull(b#7)) null else if (isnull(b#7)) null else
> UDF(b#7) * 2) AS (b * 2)#23]
> +- Scan ExistingRDD[a#6,b#7]
> {code}
> {code:java}
> --- using java.lang.Integer ---
> == Physical Plan ==
> InMemoryTableScan [a#53, (b * 2)#70]
> +- InMemoryRelation [a#53, (b * 2)#70], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- *(1) Project [a#53, (b#57 * 2) AS (b * 2)#70]
> +- InMemoryTableScan [a#53, b#57]
> +- InMemoryRelation [a#53, b#57], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
> +- *(1) Project [a#53, UDF(b#54) AS b#57]
> +- Scan ExistingRDD[a#53,b#54]{code}
> The underlying issue seems to be that when `persist` is called; Spark will
> effectively execute this code:
> {code:java}
> spark.sessionState.executePlan(dataset.queryExecution.analyzed).analyzed{code}
> (CacheManager line #100 and #166)
> This seems to generate the null-check on nullable fields twice, which won't
> match with the already cached plan:
> {code:java}
> Plan cached:
> Project [a#6, if (isnull(b#7)) null else UDF(b#7) AS b#10]
> +- LogicalRDD [a#6, b#7], false
> Plan that is used in the lookup:
> Project [a#6, if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7)
> AS b#10]
> +- LogicalRDD [a#6, b#7], false{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]