[
https://issues.apache.org/jira/browse/SPARK-29266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen updated SPARK-29266:
-------------------------------
Description:
SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as
{code:java}
def isEmpty: Boolean = withAction("isEmpty",
limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0
}
{code}
which has a global limit of 1 embedded in the middle of the query plan.
As a result, this will end up computing *all* partitions of the Dataset but
each task can stop early once it's computed a single record (due to how global
limits are planned / executed in this query).
We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that
will go through the "CollectLimit" execution strategy which first computes 1
partition, then 2, then 4, and so on. That will be faster in some cases but
slower in others: if the dataset is indeed empty then that method will be
slower than one which checks all partitions in parallel, but if it's non-empty
(and most tasks' output is non-empty) then it can be much faster.
There's not an obviously-best implementation here. However, I think there's
high value (and low downside) to optimizing for the special case where the
Dataset is an unfiltered, untransformed input dataset (e.g. the output of
{{spark.read.parquet}}):
I found a production job which calls {{isEmpty}} on the output of
{{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to
complete because it needed to launch hundreds of thousands of tasks to compute
a single record of each partition (this is an enormous dataset, hence the long
runtime for this).
I could instruct the job author use a different, more efficient method of
checking for non-emptiness, but this feels like the type of optimization that
Spark should handle itself (reducing the burden placed on users to understand
internal details of Spark's execution model).
Maybe we can special-case {{isEmpty}} for the case where plan consists of only
a file source scan (or a file source scan followed by a projection, but without
any filters, etc.). In those cases, we can use either the {{.limit(1).take()}}
implementation (under assumption that we don't have a ton of empty input files)
or something fancier (metadata-only query, looking at Parquet footers,
delegating to some datasource API, etc).
was:
SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as
{code:java}
def isEmpty: Boolean = withAction("isEmpty",
limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0
}
{code}
which has a global limit of 1 embedded in the middle of the query plan.
As a result, this will end up computing *all* partitions of the Dataset but
each task can stop early once it's computed a single record.
We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that
will go through the "CollectLimit" execution strategy which first computes 1
partition, then 2, then 4, and so on. That will be faster in some cases but
slower in others: if the dataset is indeed empty then that method will be
slower than one which checks all partitions in parallel, but if it's non-empty
(and most tasks' output is non-empty) then it can be much faster.
There's not an obviously-best implementation here. However, I think there's
high value (and low downside) to optimizing for the special case where the
Dataset is an unfiltered, untransformed input dataset (e.g. the output of
{{spark.read.parquet}}):
I found a production job which calls {{isEmpty}} on the output of
{{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to
complete because it needed to launch hundreds of thousands of tasks to compute
a single record of each partition (this is an enormous dataset, hence the long
runtime for this).
I could instruct the job author use a different, more efficient method of
checking for non-emptiness, but this feels like the type of optimization that
Spark should handle itself (reducing the burden placed on users to understand
internal details of Spark's execution model).
Maybe we can special-case {{IsEmpty}} for the case where plan consists of only
a file source scan (or a file source scan followed by a projection, but without
any filters, etc.). In those cases, we can use either the {{.limit(1).take()}}
implementation (under assumption that we don't have a ton of empty input files)
or something fancier (metadata-only query, looking at Parquet footers,
delegating to some datasource API, etc).
> Optimize Dataset.isEmpty for base relations / unfiltered datasets
> -----------------------------------------------------------------
>
> Key: SPARK-29266
> URL: https://issues.apache.org/jira/browse/SPARK-29266
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Josh Rosen
> Priority: Minor
>
> SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented
> as
> {code:java}
> def isEmpty: Boolean = withAction("isEmpty",
> limit(1).groupBy().count().queryExecution) { plan =>
> plan.executeCollect().head.getLong(0) == 0
> }
> {code}
> which has a global limit of 1 embedded in the middle of the query plan.
> As a result, this will end up computing *all* partitions of the Dataset but
> each task can stop early once it's computed a single record (due to how
> global limits are planned / executed in this query).
> We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that
> will go through the "CollectLimit" execution strategy which first computes 1
> partition, then 2, then 4, and so on. That will be faster in some cases but
> slower in others: if the dataset is indeed empty then that method will be
> slower than one which checks all partitions in parallel, but if it's
> non-empty (and most tasks' output is non-empty) then it can be much faster.
> There's not an obviously-best implementation here. However, I think there's
> high value (and low downside) to optimizing for the special case where the
> Dataset is an unfiltered, untransformed input dataset (e.g. the output of
> {{spark.read.parquet}}):
> I found a production job which calls {{isEmpty}} on the output of
> {{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to
> complete because it needed to launch hundreds of thousands of tasks to
> compute a single record of each partition (this is an enormous dataset, hence
> the long runtime for this).
> I could instruct the job author use a different, more efficient method of
> checking for non-emptiness, but this feels like the type of optimization that
> Spark should handle itself (reducing the burden placed on users to understand
> internal details of Spark's execution model).
> Maybe we can special-case {{isEmpty}} for the case where plan consists of
> only a file source scan (or a file source scan followed by a projection, but
> without any filters, etc.). In those cases, we can use either the
> {{.limit(1).take()}} implementation (under assumption that we don't have a
> ton of empty input files) or something fancier (metadata-only query, looking
> at Parquet footers, delegating to some datasource API, etc).
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]