Josh Rosen created SPARK-29266:
----------------------------------
Summary: Optimize Dataset.isEmpty for base relations / unfiltered
datasets
Key: SPARK-29266
URL: https://issues.apache.org/jira/browse/SPARK-29266
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.0.0
Reporter: Josh Rosen
SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as
{code:java}
def isEmpty: Boolean = withAction("isEmpty",
limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0
}
{code}
which has a global limit of 1 embedded in the middle of the query plan.
As a result, this will end up computing *all* partitions of the Dataset but
each task can stop early once it's computed a single record.
We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that
will go through the "CollectLimit" execution strategy which first computes 1
partition, then 2, then 4, and so on. That will be faster in some cases but
slower in others: if the dataset is indeed empty then that method will be
slower than one which checks all partitions in parallel, but if it's non-empty
(and most tasks' output is non-empty) then it can be much faster.
There's not an obviously-best implementation here. However, I think there's
high value (and low downside) to optimizing for the special case where the
Dataset is an unfiltered, untransformed input dataset (e.g. the output of
{{spark.read.parquet}}):
I found a production job which calls {{isEmpty}} on the output of
{{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to
complete because it needed to launch hundreds of thousands of tasks to compute
a single record of each partition.
I could instruct the job author use a different, more efficient method of
checking for non-emptiness, but this feels like the type of optimization that
Spark should handle itself.
Maybe we can special-case {{IsEmpty}} for the case where plan consists of only
a file source scan (or a file source scan followed by a projection, but without
any filters, etc.). In those cases, we can use either the {{.limit(1).take()}}
implementation (under assumption that we don't have a ton of empty input files)
or something fancier (metadata-only query, looking at Parquet footers,
delegating to some datasource API, etc).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]