Vinitha Reddy Gankidi created SPARK-28188:
---------------------------------------------
Summary: Materialize Dataframe API
Key: SPARK-28188
URL: https://issues.apache.org/jira/browse/SPARK-28188
Project: Spark
Issue Type: New Feature
Components: Spark Core
Affects Versions: 2.4.3
Reporter: Vinitha Reddy Gankidi
We have added a new API to materialize dataframes and our internal users have
found it very useful. For use cases where you need to do different computations
on the same dataframe, Spark recomputes the dataframe each time. This is
problematic if evaluation of the dataframe is expensive.
Materialize is a Spark action. It is a way to let Spark explicitly know that
the dataframe has already been computed. Once a dataframe is materialized,
Spark skips all stages prior to the materialize when the dataframe is reused
later on.
Spark may scan the same table twice if two queries load different columns. For
example, the following two queries would scan the same data twice:
{code:java}
val tab = spark.table("some_table").filter("c LIKE '%match%'")
val num_groups = tab.agg(distinctCount($"a"))
val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code}
The same table is scanned twice because Spark doesn't know it should load b
when the first query runs. You can use materialize to load and then reuse the
data:
{code:java}
val materialized = spark.table("some_table").filter("c LIKE '%match%'")
.select($"a", $"b").repartition($"a").materialize()
val num_groups = materialized.agg(distinctCount($"a"))
val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code}
This uses select to filter out columns that don't need to be loaded. Without
this, Spark doesn't know that only a and b are going to be used later.
This example also uses repartition to add a shuffle because Spark resumes from
the last shuffle. In most cases you may need to repartition the dataframe
before materializing it in order to skip the expensive stages as repartition
introduces a new stage.
h3. Materialize vs Cache:
* Caching/Persisting of dataframes is lazy. The first time the dataset is
computed in an action, it will be kept in memory on the nodes. Materialize is
an action that runs a job that produces the rows of data that a data frame
represents, and returns a new data frame with the result. When the result data
frame is used, Spark resumes execution using the data from the last shuffle.
* By reusing shuffle data, materialized data is served by the cluster's
persistent shuffle servers instead of Spark executors. This makes materialize
more reliable. Caching on the other hand happens in the executor where the task
runs and data could be lost if executors time out from inactivity or run out of
memory.
* Since materialize is more reliable and uses fewer resources than cache, it
is usually a better choice for batch workloads. But, for processing that
iterates over a dataset many times, it is better to keep the data in memory
using cache or persist.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]