This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 216b972 [SPARK-38360][SQL][SS][PYTHON] Introduce a `exists` function
for `TreeNode` to eliminate duplicate code patterns
216b972 is described below
commit 216b97220d6a6830c8e99c385ae7a06e805a5ae8
Author: yangjie01 <[email protected]>
AuthorDate: Thu Mar 10 21:05:20 2022 +0800
[SPARK-38360][SQL][SS][PYTHON] Introduce a `exists` function for `TreeNode`
to eliminate duplicate code patterns
### What changes were proposed in this pull request?
There are many duplicate code patterns in Spark code:
```scala
treeNode.find(condition).isDefined
treeNode.find(condition).nonEmpty
treeNode.find(condition).isEmpty
```
This pr Introduce a `exists` function for `TreeNode` to simplify them,
after this pr:
- `treeNode.find(condition).isDefined ` and
`treeNode.find(condition).nonEmpty ` -> `treeNode.exists(condition)`
- `treeNode.find(condition).isEmpty` -> `!treeNode.exists(condition)`
### Why are the changes needed?
Code simplification
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA and add a new UT for new function.
Closes #35694 from LuciferYang/treenode-exists.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 4 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 30 ++++----
.../sql/catalyst/analysis/CTESubstitution.scala | 4 +-
.../sql/catalyst/analysis/CheckAnalysis.scala | 4 +-
.../expressions/EquivalentExpressions.scala | 2 +-
.../expressions/aggregate/interfaces.scala | 2 +-
.../spark/sql/catalyst/expressions/subquery.scala | 20 +++---
.../catalyst/optimizer/DecorrelateInnerQuery.scala | 4 +-
.../spark/sql/catalyst/optimizer/InlineCTE.scala | 2 +-
.../catalyst/optimizer/NestedColumnAliasing.scala | 6 +-
.../spark/sql/catalyst/optimizer/Optimizer.scala | 7 +-
.../optimizer/RemoveRedundantAggregates.scala | 4 +-
.../optimizer/ReplaceExceptWithFilter.scala | 4 +-
.../spark/sql/catalyst/optimizer/joins.scala | 6 +-
.../spark/sql/catalyst/optimizer/subquery.scala | 2 +-
.../plans/logical/basicLogicalOperators.scala | 2 +-
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 10 +++
.../catalyst/analysis/AnsiTypeCoercionSuite.scala | 2 +-
.../sql/catalyst/analysis/TypeCoercionSuite.scala | 2 +-
.../optimizer/DecorrelateInnerQuerySuite.scala | 2 +-
.../plans/logical/AnalysisHelperSuite.scala | 2 +-
.../spark/sql/catalyst/trees/TreeNodeSuite.scala | 38 ++++++++++
.../main/scala/org/apache/spark/sql/Dataset.scala | 4 +-
.../apache/spark/sql/execution/CacheManager.scala | 8 +--
.../spark/sql/execution/DataSourceScanExec.scala | 2 +-
.../sql/execution/WholeStageCodegenExec.scala | 2 +-
.../sql/execution/adaptive/AQEOptimizer.scala | 2 +-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 4 +-
.../adaptive/InsertAdaptiveSparkPlan.scala | 8 +--
.../analysis/DetectAmbiguousSelfJoin.scala | 2 +-
.../bucketing/DisableUnnecessaryBucketedScan.scala | 4 +-
.../dynamicpruning/PartitionPruning.scala | 4 +-
.../dynamicpruning/PlanDynamicPruningFilters.scala | 4 +-
.../execution/python/AggregateInPandasExec.scala | 2 +-
.../sql/execution/python/EvalPythonExec.scala | 2 +-
.../sql/execution/python/ExtractPythonUDFs.scala | 10 +--
.../sql/execution/python/WindowInPandasExec.scala | 2 +-
.../org/apache/spark/sql/execution/subquery.scala | 4 +-
.../org/apache/spark/sql/CTEInlineSuite.scala | 8 +--
.../spark/sql/DataFrameWindowFunctionsSuite.scala | 8 +--
.../org/apache/spark/sql/DatasetCacheSuite.scala | 2 +-
.../spark/sql/DynamicPartitionPruningSuite.scala | 18 ++---
.../scala/org/apache/spark/sql/JoinSuite.scala | 4 +-
...laceNullWithFalseInPredicateEndToEndSuite.scala | 8 +--
.../connector/FileDataSourceV2FallBackSuite.scala | 2 +-
.../DeprecatedWholeStageCodegenSuite.scala | 4 +-
.../apache/spark/sql/execution/PlannerSuite.scala | 6 +-
.../WholeStageCodegenSparkSubmitSuite.scala | 2 +-
.../sql/execution/WholeStageCodegenSuite.scala | 82 +++++++++++-----------
.../sql/execution/benchmark/JoinBenchmark.scala | 23 +++---
.../sql/execution/datasources/json/JsonSuite.scala | 4 +-
.../execution/datasources/orc/OrcQuerySuite.scala | 12 ++--
.../streaming/sources/ForeachBatchSinkSuite.scala | 4 +-
.../spark/sql/sources/BucketedReadSuite.scala | 12 ++--
54 files changed, 234 insertions(+), 188 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5037af1..db71f0f 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1416,10 +1416,10 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
testStream(kafka)(
makeSureGetOffsetCalled,
AssertOnQuery { query =>
- query.logicalPlan.find {
+ query.logicalPlan.exists {
case r: StreamingDataSourceV2Relation =>
r.stream.isInstanceOf[KafkaMicroBatchStream]
case _ => false
- }.isDefined
+ }
}
)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 27e8ed2..21454be 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -509,7 +509,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}
private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
- exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)
+ exprs.exists(_.exists(_.isInstanceOf[UnresolvedAlias]))
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
_.containsPattern(UNRESOLVED_ALIAS), ruleId) {
@@ -616,7 +616,7 @@ class Analyzer(override val catalogManager: CatalogManager)
val aggsBuffer = ArrayBuffer[Expression]()
// Returns whether the expression belongs to any expressions in
`aggsBuffer` or not.
def isPartOfAggregation(e: Expression): Boolean = {
- aggsBuffer.exists(a => a.find(_ eq e).isDefined)
+ aggsBuffer.exists(a => a.exists(_ eq e))
}
replaceGroupingFunc(agg, groupByExprs, gid).transformDown {
// AggregateExpression should be computed on the unmodified value of
its argument
@@ -966,14 +966,14 @@ class Analyzer(override val catalogManager:
CatalogManager)
}
private def hasMetadataCol(plan: LogicalPlan): Boolean = {
- plan.expressions.exists(_.find {
+ plan.expressions.exists(_.exists {
case a: Attribute =>
// If an attribute is resolved before being labeled as metadata
// (i.e. from the originating Dataset), we check with expression ID
a.isMetadataCol ||
plan.children.exists(c => c.metadataOutput.exists(_.exprId ==
a.exprId))
case _ => false
- }.isDefined)
+ })
}
private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
@@ -1674,7 +1674,7 @@ class Analyzer(override val catalogManager:
CatalogManager)
}
private def containsDeserializer(exprs: Seq[Expression]): Boolean = {
- exprs.exists(_.find(_.isInstanceOf[UnresolvedDeserializer]).isDefined)
+ exprs.exists(_.exists(_.isInstanceOf[UnresolvedDeserializer]))
}
// support CURRENT_DATE, CURRENT_TIMESTAMP, and grouping__id
@@ -1869,7 +1869,7 @@ class Analyzer(override val catalogManager:
CatalogManager)
withPosition(ordinal) {
if (index > 0 && index <= aggs.size) {
val ordinalExpr = aggs(index - 1)
- if
(ordinalExpr.find(_.isInstanceOf[AggregateExpression]).nonEmpty) {
+ if (ordinalExpr.exists(_.isInstanceOf[AggregateExpression])) {
throw
QueryCompilationErrors.groupByPositionRefersToAggregateFunctionError(
index, ordinalExpr)
} else {
@@ -2687,7 +2687,7 @@ class Analyzer(override val catalogManager:
CatalogManager)
*/
object ExtractGenerator extends Rule[LogicalPlan] {
private def hasGenerator(expr: Expression): Boolean = {
- expr.find(_.isInstanceOf[Generator]).isDefined
+ expr.exists(_.isInstanceOf[Generator])
}
private def hasNestedGenerator(expr: NamedExpression): Boolean = {
@@ -2697,10 +2697,10 @@ class Analyzer(override val catalogManager:
CatalogManager)
case go: GeneratorOuter =>
hasInnerGenerator(go.child)
case _ =>
- g.children.exists { _.find {
+ g.children.exists { _.exists {
case _: Generator => true
case _ => false
- }.isDefined }
+ } }
}
trimNonTopLevelAliases(expr) match {
case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g)
@@ -2711,12 +2711,12 @@ class Analyzer(override val catalogManager:
CatalogManager)
}
private def hasAggFunctionInGenerator(ne: Seq[NamedExpression]): Boolean =
{
- ne.exists(_.find {
+ ne.exists(_.exists {
case g: Generator =>
-
g.children.exists(_.find(_.isInstanceOf[AggregateFunction]).isDefined)
+ g.children.exists(_.exists(_.isInstanceOf[AggregateFunction]))
case _ =>
false
- }.nonEmpty)
+ })
}
private def trimAlias(expr: NamedExpression): Expression = expr match {
@@ -2917,10 +2917,10 @@ class Analyzer(override val catalogManager:
CatalogManager)
exprs.exists(hasWindowFunction)
private def hasWindowFunction(expr: Expression): Boolean = {
- expr.find {
+ expr.exists {
case window: WindowExpression => true
case _ => false
- }.isDefined
+ }
}
/**
@@ -3756,7 +3756,7 @@ class Analyzer(override val catalogManager:
CatalogManager)
}
private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
-
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
+ a.expressions.exists(_.exists(_.isInstanceOf[UnresolvedFieldName]))
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 2397527..c0ba359 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -51,10 +51,10 @@ object CTESubstitution extends Rule[LogicalPlan] {
if (!plan.containsPattern(UNRESOLVED_WITH)) {
return plan
}
- val isCommand = plan.find {
+ val isCommand = plan.exists {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
- }.isDefined
+ }
val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef]
val (substituted, lastSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match
{
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7319248..c05b932 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -334,7 +334,7 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
}
def checkValidGroupingExprs(expr: Expression): Unit = {
- if (expr.find(_.isInstanceOf[AggregateExpression]).isDefined) {
+ if (expr.exists(_.isInstanceOf[AggregateExpression])) {
failAnalysis(
"aggregate functions are not allowed in GROUP BY, but found
" + expr.sql)
}
@@ -718,7 +718,7 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
// Check whether the given expressions contains the subquery expression.
def containsExpr(expressions: Seq[Expression]): Boolean = {
- expressions.exists(_.find(_.semanticEquals(expr)).isDefined)
+ expressions.exists(_.exists(_.semanticEquals(expr)))
}
// Validate the subquery plan.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 59e2be4..903a6fd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -194,7 +194,7 @@ class EquivalentExpressions {
expr.isInstanceOf[LeafExpression] ||
// `LambdaVariable` is usually used as a loop variable, which can't be
evaluated ahead of the
// loop. So we can't evaluate sub-expressions containing
`LambdaVariable` at the beginning.
- expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
+ expr.exists(_.isInstanceOf[LambdaVariable]) ||
// `PlanExpression` wraps query plan. To compare query plans of
`PlanExpression` on executor,
// can cause error like NPE.
(expr.isInstanceOf[PlanExpression[_]] && Utils.isInRunningSparkTask)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 3ba9065..f97293d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -83,7 +83,7 @@ object AggregateExpression {
}
def containsAggregate(expr: Expression): Boolean = {
- expr.find(isAggregate).isDefined
+ expr.exists(isAggregate)
}
def isAggregate(expr: Expression): Boolean = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index d7112a2..71b36fa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -88,11 +88,11 @@ object SubqueryExpression {
* and false otherwise.
*/
def hasInOrCorrelatedExistsSubquery(e: Expression): Boolean = {
- e.find {
+ e.exists {
case _: ListQuery => true
case ex: Exists => ex.isCorrelated
case _ => false
- }.isDefined
+ }
}
/**
@@ -101,20 +101,20 @@ object SubqueryExpression {
* [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
*/
def hasCorrelatedSubquery(e: Expression): Boolean = {
- e.find {
+ e.exists {
case s: SubqueryExpression => s.isCorrelated
case _ => false
- }.isDefined
+ }
}
/**
* Returns true when an expression contains a subquery
*/
def hasSubquery(e: Expression): Boolean = {
- e.find {
+ e.exists {
case _: SubqueryExpression => true
case _ => false
- }.isDefined
+ }
}
}
@@ -124,7 +124,7 @@ object SubExprUtils extends PredicateHelper {
* returns false otherwise.
*/
def containsOuter(e: Expression): Boolean = {
- e.find(_.isInstanceOf[OuterReference]).isDefined
+ e.exists(_.isInstanceOf[OuterReference])
}
/**
@@ -161,7 +161,7 @@ object SubExprUtils extends PredicateHelper {
* Given a logical plan, returns TRUE if it has an outer reference and false
otherwise.
*/
def hasOuterReferences(plan: LogicalPlan): Boolean = {
- plan.find(_.expressions.exists(containsOuter)).isDefined
+ plan.exists(_.expressions.exists(containsOuter))
}
/**
@@ -282,10 +282,10 @@ case class ScalarSubquery(
object ScalarSubquery {
def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
- e.find {
+ e.exists {
case s: ScalarSubquery => s.isCorrelated
case _ => false
- }.isDefined
+ }
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 5ad70f0..9a4d1a3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -87,7 +87,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
* leaf node and will not be found here.
*/
private def containsAttribute(expression: Expression): Boolean = {
- expression.find(_.isInstanceOf[Attribute]).isDefined
+ expression.exists(_.isInstanceOf[Attribute])
}
/**
@@ -268,7 +268,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
// The decorrelation framework adds domain inner joins by traversing
down the plan tree
// recursively until it reaches a node that is not correlated with
the outer query.
// So the child node of a domain inner join shouldn't contain
another domain join.
- assert(child.find(_.isInstanceOf[DomainJoin]).isEmpty,
+ assert(!child.exists(_.isInstanceOf[DomainJoin]),
s"Child of a domain inner join shouldn't contain another domain
join.\n$child")
child
case o =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
index 1de300e..61577b1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
@@ -54,7 +54,7 @@ object InlineCTE extends Rule[LogicalPlan] {
// 2) Any `CTERelationRef` that contains `OuterReference` would have been
inlined first.
refCount == 1 ||
cteDef.deterministic ||
-
cteDef.child.find(_.expressions.exists(_.isInstanceOf[OuterReference])).isDefined
+ cteDef.child.exists(_.expressions.exists(_.isInstanceOf[OuterReference]))
}
private def buildCTEMap(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index a2ee950..4c7130e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -247,7 +247,7 @@ object NestedColumnAliasing {
exprList.foreach { e =>
collectRootReferenceAndExtractValue(e).foreach {
// we can not alias the attr from lambda variable whose expr id is not
available
- case ev: ExtractValue if
ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty =>
+ case ev: ExtractValue if
!ev.exists(_.isInstanceOf[NamedLambdaVariable]) =>
if (ev.references.size == 1) {
nestedFieldReferences.append(ev)
}
@@ -267,7 +267,7 @@ object NestedColumnAliasing {
// that do should not have an alias generated as it can lead to
pushing the aggregate down
// into a projection.
def containsAggregateFunction(ev: ExtractValue): Boolean =
- ev.find(_.isInstanceOf[AggregateFunction]).isDefined
+ ev.exists(_.isInstanceOf[AggregateFunction])
// Remove redundant [[ExtractValue]]s if they share the same parent
nest field.
// For example, when `a.b` and `a.b.c` are in project list, we only
need to alias `a.b`.
@@ -277,7 +277,7 @@ object NestedColumnAliasing {
// [[GetStructField]]
case e @ (_: GetStructField | _: GetArrayStructFields) =>
val child = e.children.head
- nestedFields.forall(f => child.find(_.semanticEquals(f)).isEmpty)
+ nestedFields.forall(f => !child.exists(_.semanticEquals(f)))
case _ => true
}
.distinct
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f7ff566..e245d14 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -52,7 +52,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
previousPlan: LogicalPlan,
currentPlan: LogicalPlan): Boolean = {
!Utils.isTesting || (currentPlan.resolved &&
-
currentPlan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty
&&
+
!currentPlan.exists(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty)
&&
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
DataType.equalsIgnoreNullability(previousPlan.schema,
currentPlan.schema))
}
@@ -1690,11 +1690,10 @@ object PushPredicateThroughNonJoin extends
Rule[LogicalPlan] with PredicateHelpe
*/
private def canPushThroughCondition(plan: LogicalPlan, condition:
Expression): Boolean = {
val attributes = plan.outputSet
- val matched = condition.find {
+ !condition.exists {
case s: SubqueryExpression =>
s.plan.outputSet.intersect(attributes).nonEmpty
case _ => false
}
- matched.isEmpty
}
}
@@ -1956,7 +1955,7 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
}
private def hasUnevaluableExpr(expr: Expression): Boolean = {
- expr.find(e => e.isInstanceOf[Unevaluable] &&
!e.isInstanceOf[AttributeReference]).isDefined
+ expr.exists(e => e.isInstanceOf[Unevaluable] &&
!e.isInstanceOf[AttributeReference])
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
index bf17791..beec90d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
@@ -52,10 +52,10 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan]
with AliasHelper {
private def isLowerRedundant(upper: Aggregate, lower: Aggregate): Boolean = {
val upperHasNoDuplicateSensitiveAgg = upper
.aggregateExpressions
- .forall(expr => expr.find {
+ .forall(expr => !expr.exists {
case ae: AggregateExpression => isDuplicateSensitive(ae)
case e => AggregateExpression.isAggregate(e)
- }.isEmpty)
+ })
lazy val upperRefsOnlyDeterministicNonAgg =
upper.references.subsetOf(AttributeSet(
lower
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 8218051..f66128d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -87,8 +87,8 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
val rightProjectList = projectList(right)
left.output.size == left.output.map(_.name).distinct.size &&
- left.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty
&&
-
right.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty &&
+ !left.exists(_.expressions.exists(SubqueryExpression.hasSubquery)) &&
+ !right.exists(_.expressions.exists(SubqueryExpression.hasSubquery)) &&
Project(leftProjectList,
nonFilterChild(skipProject(left))).sameResult(
Project(rightProjectList, nonFilterChild(skipProject(right))))
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index e03360d..6d683a7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -143,7 +143,7 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with
PredicateHelper {
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val boundE = BindReferences.bindReference(e, attributes)
- if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false
+ if (boundE.exists(_.isInstanceOf[Unevaluable])) return false
val v = boundE.eval(emptyRow)
v == null || v == false
}
@@ -195,9 +195,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with
PredicateHelper {
object ExtractPythonUDFFromJoinCondition extends Rule[LogicalPlan] with
PredicateHelper {
private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = {
- expr.find { e =>
+ expr.exists { e =>
PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) &&
!canEvaluate(e, j.right)
- }.isDefined
+ }
}
override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformUpWithPruning(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 6d6b8b7..7ef5ef5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -728,7 +728,7 @@ object OptimizeOneRowRelationSubquery extends
Rule[LogicalPlan] {
}
private def hasCorrelatedSubquery(plan: LogicalPlan): Boolean = {
-
plan.find(_.expressions.exists(SubqueryExpression.hasCorrelatedSubquery)).isDefined
+ plan.exists(_.expressions.exists(SubqueryExpression.hasCorrelatedSubquery))
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 5b601fb..8a6598f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -989,7 +989,7 @@ case class Aggregate(
final override val nodePatterns : Seq[TreePattern] = Seq(AGGREGATE)
override lazy val validConstraints: ExpressionSet = {
- val nonAgg =
aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
+ val nonAgg =
aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))
getAllValidConstraints(nonAgg)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 9e50be3..ac60e18 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -247,6 +247,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product with Tre
}
/**
+ * Test whether there is [[TreeNode]] satisfies the conditions specified in
`f`.
+ * The condition is recursively applied to this node and all of its children
(pre-order).
+ */
+ def exists(f: BaseType => Boolean): Boolean = if (f(this)) {
+ true
+ } else {
+ children.exists(_.exists(f))
+ }
+
+ /**
* Runs the given function on this node and then recursively on [[children]].
* @param f the function to be applied to each node in the tree.
*/
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
index 19ee6d8..1f23aeb6 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
@@ -890,7 +890,7 @@ class AnsiTypeCoercionSuite extends TypeCoercionSuiteBase {
val wp1 = widenSetOperationTypes(union.select(p1.output.head, $"p2.v"))
assert(wp1.isInstanceOf[Project])
// The attribute `p1.output.head` should be replaced in the root `Project`.
- assert(wp1.expressions.forall(_.find(_ == p1.output.head).isEmpty))
+ assert(wp1.expressions.forall(!_.exists(_ == p1.output.head)))
val wp2 = widenSetOperationTypes(Aggregate(Nil,
sum(p1.output.head).as("v") :: Nil, union))
assert(wp2.isInstanceOf[Aggregate])
assert(wp2.missingInput.isEmpty)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 63ad84e..782f3e4 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -1490,7 +1490,7 @@ class TypeCoercionSuite extends TypeCoercionSuiteBase {
val wp1 = widenSetOperationTypes(union.select(p1.output.head, $"p2.v"))
assert(wp1.isInstanceOf[Project])
// The attribute `p1.output.head` should be replaced in the root `Project`.
- assert(wp1.expressions.forall(_.find(_ == p1.output.head).isEmpty))
+ assert(wp1.expressions.forall(!_.exists(_ == p1.output.head)))
val wp2 = widenSetOperationTypes(Aggregate(Nil,
sum(p1.output.head).as("v") :: Nil, union))
assert(wp2.isInstanceOf[Aggregate])
assert(wp2.missingInput.isEmpty)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
index dc50039..c74eeea 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
@@ -37,7 +37,7 @@ class DecorrelateInnerQuerySuite extends PlanTest {
val testRelation2 = LocalRelation(x, y, z)
private def hasOuterReferences(plan: LogicalPlan): Boolean = {
- plan.find(_.expressions.exists(SubExprUtils.containsOuter)).isDefined
+ plan.exists(_.expressions.exists(SubExprUtils.containsOuter))
}
private def check(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
index 0a3f86e..4a42645 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
@@ -41,7 +41,7 @@ class AnalysisHelperSuite extends SparkFunSuite {
test("setAnalyze is recursive") {
val plan = Project(Nil, LocalRelation())
plan.setAnalyzed()
- assert(plan.find(!_.analyzed).isEmpty)
+ assert(!plan.exists(!_.analyzed))
}
test("resolveOperator runs on operators recursively") {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index b52ecb5..b6087c5 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -248,6 +248,44 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
assert(expected === actual)
}
+ test("exists") {
+ val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3),
Literal(4))))
+ // Check the top node.
+ var exists = expression.exists {
+ case _: Add => true
+ case _ => false
+ }
+ assert(exists)
+
+ // Check the first children.
+ exists = expression.exists {
+ case Literal(1, IntegerType) => true
+ case _ => false
+ }
+ assert(exists)
+
+ // Check an internal node (Subtract).
+ exists = expression.exists {
+ case _: Subtract => true
+ case _ => false
+ }
+ assert(exists)
+
+ // Check a leaf node.
+ exists = expression.exists {
+ case Literal(3, IntegerType) => true
+ case _ => false
+ }
+ assert(exists)
+
+ // Check not exists.
+ exists = expression.exists {
+ case Literal(100, IntegerType) => true
+ case _ => false
+ }
+ assert(!exists)
+ }
+
test("collectFirst") {
val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3),
Literal(4))))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 4a921b4..62dea96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1503,10 +1503,10 @@ class Dataset[T] private[sql](
case typedCol: TypedColumn[_, _] =>
// Checks if a `TypedColumn` has been inserted with
// specific input type and schema by `withInputType`.
- val needInputType = typedCol.expr.find {
+ val needInputType = typedCol.expr.exists {
case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty =>
true
case _ => false
- }.isDefined
+ }
if (!needInputType) {
typedCol
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 426b233..27d6bed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -161,7 +161,7 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
blocking: Boolean = false): Unit = {
val shouldRemove: LogicalPlan => Boolean =
if (cascade) {
- _.find(_.sameResult(plan)).isDefined
+ _.exists(_.sameResult(plan))
} else {
_.sameResult(plan)
}
@@ -187,7 +187,7 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
// will keep it as it is. It means the physical plan has been
re-compiled already in the
// other thread.
val cacheAlreadyLoaded =
cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
- cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded
+ cd.plan.exists(_.sameResult(plan)) && !cacheAlreadyLoaded
})
}
}
@@ -207,7 +207,7 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
* Tries to re-cache all the cache entries that refer to the given plan.
*/
def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
- recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined)
+ recacheByCondition(spark, _.plan.exists(_.sameResult(plan)))
}
/**
@@ -288,7 +288,7 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
*/
def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem):
Unit = {
val qualifiedPath = fs.makeQualified(resourcePath)
- recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs,
qualifiedPath)).isDefined)
+ recacheByCondition(spark, _.plan.exists(lookupAndRefresh(_, fs,
qualifiedPath)))
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index db86b38..1e2fa41 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -239,7 +239,7 @@ case class FileSourceScanExec(
}
private def isDynamicPruningFilter(e: Expression): Boolean =
- e.find(_.isInstanceOf[PlanExpression[_]]).isDefined
+ e.exists(_.isInstanceOf[PlanExpression[_]])
@transient lazy val selectedPartitions: Array[PartitionDirectory] = {
val optimizerMetadataTimeNs =
relation.location.metadataOpsTimeNs.getOrElse(0L)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index dde976c..7d36fd5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -892,7 +892,7 @@ case class CollapseCodegenStages(
private def supportCodegen(plan: SparkPlan): Boolean = plan match {
case plan: CodegenSupport if plan.supportCodegen =>
- val willFallback = plan.expressions.exists(_.find(e =>
!supportCodegen(e)).isDefined)
+ val willFallback = plan.expressions.exists(_.exists(e =>
!supportCodegen(e)))
// the generated code will be huge if there are too many columns
val hasTooManyOutputFields =
WholeStageCodegenExec.isTooManyFields(conf, plan.schema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
index 06e9c18..5533bb1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
@@ -71,7 +71,7 @@ class AQEOptimizer(conf: SQLConf) extends
RuleExecutor[LogicalPlan] {
previousPlan: LogicalPlan,
currentPlan: LogicalPlan): Boolean = {
!Utils.isTesting || (currentPlan.resolved &&
-
currentPlan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty
&&
+
!currentPlan.exists(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty)
&&
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
DataType.equalsIgnoreNullability(previousPlan.schema,
currentPlan.schema))
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 3ec5aad..14b4525 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -332,7 +332,7 @@ case class AdaptiveSparkPlanExec(
// Subqueries that don't belong to any query stage of the main query will
execute after the
// last UI update in `getFinalPhysicalPlan`, so we need to update UI here
again to make sure
// the newly generated nodes of those subqueries are updated.
- if (!isSubquery &&
currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) {
+ if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
}
logOnLevel(s"Final plan: $currentPhysicalPlan")
@@ -611,7 +611,7 @@ case class AdaptiveSparkPlanExec(
stagesToReplace: Seq[QueryStageExec]): LogicalPlan = {
var logicalPlan = plan
stagesToReplace.foreach {
- case stage if currentPhysicalPlan.find(_.eq(stage)).isDefined =>
+ case stage if currentPhysicalPlan.exists(_.eq(stage)) =>
val logicalNodeOpt =
stage.getTagValue(TEMP_LOGICAL_PLAN_TAG).orElse(stage.logicalLink)
assert(logicalNodeOpt.isDefined)
val logicalNode = logicalNodeOpt.get
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 5c20845..4410f7f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -88,14 +88,14 @@ case class InsertAdaptiveSparkPlan(
// - The query contains sub-query.
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
- plan.find {
+ plan.exists {
case _: Exchange => true
case p if !p.requiredChildDistribution.forall(_ ==
UnspecifiedDistribution) => true
- case p => p.expressions.exists(_.find {
+ case p => p.expressions.exists(_.exists {
case _: SubqueryExpression => true
case _ => false
- }.isDefined)
- }.isDefined
+ })
+ }
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index 17ea93e..7e9628c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -78,7 +78,7 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
// We always remove the special metadata from `AttributeReference` at the
end of this rule, so
// Dataset column reference only exists in the root node via Dataset
transformations like
// `Dataset#select`.
- if (plan.find(_.isInstanceOf[Join]).isEmpty) return
stripColumnReferenceMetadataInPlan(plan)
+ if (!plan.exists(_.isInstanceOf[Join])) return
stripColumnReferenceMetadataInPlan(plan)
val colRefAttrs = plan.expressions.flatMap(_.collect {
case a: AttributeReference if isColumnReference(a) => a
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
index 479bc21..1eb1082 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
@@ -141,10 +141,10 @@ object DisableUnnecessaryBucketedScan extends
Rule[SparkPlan] {
}
def apply(plan: SparkPlan): SparkPlan = {
- lazy val hasBucketedScan = plan.find {
+ lazy val hasBucketedScan = plan.exists {
case scan: FileSourceScanExec => scan.bucketedScan
case _ => false
- }.isDefined
+ }
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled ||
!hasBucketedScan) {
plan
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 48d31d8..3b5fc4a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -214,10 +214,10 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper {
* Search a filtering predicate in a given logical plan
*/
private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
- plan.find {
+ plan.exists {
case f: Filter => isLikelySelective(f.condition)
case _ => false
- }.isDefined
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index 9a05e39..252565f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -58,13 +58,13 @@ case class PlanDynamicPruningFilters(sparkSession:
SparkSession)
// Using `sparkPlan` is a little hacky as it is based on the
assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty
&&
- plan.find {
+ plan.exists {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(sparkPlan)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(sparkPlan)
case _ => false
- }.isDefined
+ }
if (canReuseExchange) {
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession,
sparkPlan)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 69802b1..a7f63aa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -88,7 +88,7 @@ case class AggregateInPandasExec(
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
// There should not be any other UDFs, or the children can't be
evaluated directly.
- assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+ assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e4..c567a70 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -72,7 +72,7 @@ trait EvalPythonExec extends UnaryExecNode {
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
// There should not be any other UDFs, or the children can't be
evaluated directly.
- assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+ assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 407c498..a809ea0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -45,10 +45,10 @@ object ExtractPythonUDFFromAggregate extends
Rule[LogicalPlan] {
}
private def hasPythonUdfOverAggregate(expr: Expression, agg: Aggregate):
Boolean = {
- expr.find {
+ expr.exists {
e => PythonUDF.isScalarPythonUDF(e) &&
- (e.references.isEmpty || e.find(belongAggregate(_, agg)).isDefined)
- }.isDefined
+ (e.references.isEmpty || e.exists(belongAggregate(_, agg)))
+ }
}
private def extract(agg: Aggregate): LogicalPlan = {
@@ -90,7 +90,7 @@ object ExtractPythonUDFFromAggregate extends
Rule[LogicalPlan] {
*/
object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] {
private def hasScalarPythonUDF(e: Expression): Boolean = {
- e.find(PythonUDF.isScalarPythonUDF).isDefined
+ e.exists(PythonUDF.isScalarPythonUDF)
}
private def extract(agg: Aggregate): LogicalPlan = {
@@ -164,7 +164,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with
PredicateHelper {
private type EvalTypeChecker = EvalType => Boolean
private def hasScalarPythonUDF(e: Expression): Boolean = {
- e.find(PythonUDF.isScalarPythonUDF).isDefined
+ e.exists(PythonUDF.isScalarPythonUDF)
}
@scala.annotation.tailrec
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index 87102cc..e73da99 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -113,7 +113,7 @@ case class WindowInPandasExec(
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
// There should not be any other UDFs, or the children can't be
evaluated directly.
- assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+ assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 8878677..afd0aba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -46,10 +46,10 @@ object ExecSubqueryExpression {
* Returns true when an expression contains a subquery
*/
def hasSubquery(e: Expression): Boolean = {
- e.find {
+ e.exists {
case _: ExecSubqueryExpression => true
case _ => false
- }.isDefined
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 7ee533a..dd30ff6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -42,7 +42,7 @@ abstract class CTEInlineSuiteBase
""".stripMargin)
checkAnswer(df, Nil)
assert(
- df.queryExecution.optimizedPlan.find(_.isInstanceOf[WithCTE]).nonEmpty,
+ df.queryExecution.optimizedPlan.exists(_.isInstanceOf[WithCTE]),
"Non-deterministic With-CTE with multiple references should be not
inlined.")
}
}
@@ -59,7 +59,7 @@ abstract class CTEInlineSuiteBase
""".stripMargin)
checkAnswer(df, Nil)
assert(
- df.queryExecution.optimizedPlan.find(_.isInstanceOf[WithCTE]).nonEmpty,
+ df.queryExecution.optimizedPlan.exists(_.isInstanceOf[WithCTE]),
"Non-deterministic With-CTE with multiple references should be not
inlined.")
}
}
@@ -76,10 +76,10 @@ abstract class CTEInlineSuiteBase
""".stripMargin)
checkAnswer(df, Row(0, 1) :: Row(1, 2) :: Nil)
assert(
- df.queryExecution.analyzed.find(_.isInstanceOf[WithCTE]).nonEmpty,
+ df.queryExecution.analyzed.exists(_.isInstanceOf[WithCTE]),
"With-CTE should not be inlined in analyzed plan.")
assert(
- df.queryExecution.optimizedPlan.find(_.isInstanceOf[WithCTE]).isEmpty,
+ !df.queryExecution.optimizedPlan.exists(_.isInstanceOf[WithCTE]),
"With-CTE with one reference should be inlined in optimized plan.")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index f33de74..11b2309 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1110,16 +1110,16 @@ class DataFrameWindowFunctionsSuite extends QueryTest
checkAnswer(windowed, Seq(Row("b", 4), Row(null, null), Row(null, null),
Row(null, null)))
- val shuffleByRequirement = windowed.queryExecution.executedPlan.find {
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
case w: WindowExec =>
- w.child.find {
+ w.child.exists {
case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
case _ => false
- }.nonEmpty
+ }
case _ => false
}
- assert(shuffleByRequirement.nonEmpty, "Can't find desired shuffle node
from the query plan")
+ assert(shuffleByRequirement, "Can't find desired shuffle node from the
query plan")
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 009ccb9..2f4098d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -250,7 +250,7 @@ class DatasetCacheSuite extends QueryTest
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df2LimitInnerPlan.isDefined &&
-
df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
+ !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}
test("SPARK-27739 Save stats from optimized plan") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 3569775..6188516 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -207,10 +207,10 @@ abstract class DynamicPartitionPruningSuiteBase
case _: ReusedExchangeExec => // reuse check ok.
case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse
check ok.
case b: BroadcastExchangeLike =>
- val hasReuse = plan.find {
+ val hasReuse = plan.exists {
case ReusedExchangeExec(_, e) => e eq b
case _ => false
- }.isDefined
+ }
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val broadcastQueryStage = collectFirst(a) {
@@ -234,7 +234,7 @@ abstract class DynamicPartitionPruningSuiteBase
case r: ReusedSubqueryExec => r.child
case o => o
}
- assert(subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined ==
isMainQueryAdaptive)
+ assert(subquery.exists(_.isInstanceOf[AdaptiveSparkPlanExec]) ==
isMainQueryAdaptive)
}
}
@@ -344,12 +344,12 @@ abstract class DynamicPartitionPruningSuiteBase
| )
""".stripMargin)
- val found = df.queryExecution.executedPlan.find {
+ val found = df.queryExecution.executedPlan.exists {
case BroadcastHashJoinExec(_, _, p: ExistenceJoin, _, _, _, _, _) =>
true
case _ => false
}
- assert(found.isEmpty)
+ assert(!found)
}
}
@@ -1560,14 +1560,14 @@ abstract class
DynamicPartitionPruningDataSourceSuiteBase
}
// search dynamic pruning predicates on the executed plan
val plan =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.executedPlan
- val ret = plan.find {
+ val ret = plan.exists {
case s: FileSourceScanExec => s.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
}
case _ => false
}
- assert(ret.isDefined == false)
+ assert(!ret)
}
}
}
@@ -1607,10 +1607,10 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
val scanOption =
find(plan) {
case s: FileSourceScanExec =>
- s.output.exists(_.find(_.argString(maxFields =
100).contains("fid")).isDefined)
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
- s.output.exists(_.find(_.argString(maxFields =
100).contains("f1")).isDefined)
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
case _ => false
}
assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index ec6c863..4a8421a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1074,8 +1074,8 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
val df = left.crossJoin(right).where(pythonTestUDF(left("a")) ===
right.col("c"))
// Before optimization, there is a logical Filter operator.
- val filterInAnalysis =
df.queryExecution.analyzed.find(_.isInstanceOf[Filter])
- assert(filterInAnalysis.isDefined)
+ val filterInAnalysis =
df.queryExecution.analyzed.exists(_.isInstanceOf[Filter])
+ assert(filterInAnalysis)
// Filter predicate was pushdown as join condition. So there is no Filter
exec operator.
val filterExec =
find(df.queryExecution.executedPlan)(_.isInstanceOf[FilterExec])
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
index 739b405..8883e9b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
@@ -59,13 +59,13 @@ class ReplaceNullWithFalseInPredicateEndToEndSuite extends
QueryTest with Shared
val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out")
checkAnswer(q5, Row(1) :: Row(1) :: Nil)
q5.queryExecution.executedPlan.foreach { p =>
- assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty))
+ assert(p.expressions.forall(e => !e.exists(_.isInstanceOf[If])))
}
val q6 = df1.selectExpr("CASE WHEN (l > 2 AND null) THEN 3 ELSE 2 END")
checkAnswer(q6, Row(2) :: Row(2) :: Nil)
q6.queryExecution.executedPlan.foreach { p =>
- assert(p.expressions.forall(e =>
e.find(_.isInstanceOf[CaseWhen]).isEmpty))
+ assert(p.expressions.forall(e => !e.exists(_.isInstanceOf[CaseWhen])))
}
checkAnswer(df1.where("IF(l > 10, false, b OR null)"), Row(1, true))
@@ -75,10 +75,10 @@ class ReplaceNullWithFalseInPredicateEndToEndSuite extends
QueryTest with Shared
test("SPARK-26107: Replace Literal(null, _) with FalseLiteral in
higher-order functions") {
def assertNoLiteralNullInPlan(df: DataFrame): Unit = {
df.queryExecution.executedPlan.foreach { p =>
- assert(p.expressions.forall(_.find {
+ assert(p.expressions.forall(!_.exists {
case Literal(null, BooleanType) => true
case _ => false
- }.isEmpty))
+ }))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
index 5156bd4..cfc8b2c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
@@ -184,7 +184,7 @@ class FileDataSourceV2FallBackSuite extends QueryTest with
SharedSparkSession {
val df = spark.read.format(format).load(path.getCanonicalPath)
checkAnswer(df, inputData.toDF())
assert(
-
df.queryExecution.executedPlan.find(_.isInstanceOf[FileSourceScanExec]).isDefined)
+
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
}
} finally {
spark.listenerManager.unregister(listener)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
index b27a940..635c794 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
@@ -36,9 +36,9 @@ class DeprecatedWholeStageCodegenSuite extends QueryTest
.groupByKey(_._1).agg(typed.sum(_._2))
val plan = ds.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]))
assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index dfc1b70..c3c8959 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -221,7 +221,7 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
val query = testData.select(Symbol("key"), Symbol("value"))
.sort(Symbol("key")).limit(2).filter('key === 3)
val planned = query.queryExecution.executedPlan
- assert(planned.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
+ assert(planned.exists(_.isInstanceOf[TakeOrderedAndProjectExec]))
}
test("CollectLimit can appear in the middle of a plan when caching is used")
{
@@ -234,11 +234,11 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1000") {
val query0 =
testData.select(Symbol("value")).orderBy(Symbol("key")).limit(100)
val planned0 = query0.queryExecution.executedPlan
-
assert(planned0.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
+ assert(planned0.exists(_.isInstanceOf[TakeOrderedAndProjectExec]))
val query1 =
testData.select(Symbol("value")).orderBy(Symbol("key")).limit(2000)
val planned1 = query1.queryExecution.executedPlan
- assert(planned1.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isEmpty)
+ assert(!planned1.exists(_.isInstanceOf[TakeOrderedAndProjectExec]))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
index 5e0318d..73c4e4c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
@@ -84,7 +84,7 @@ object WholeStageCodegenSparkSubmitSuite extends Assertions
with Logging {
val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType)
as "v")
.groupBy(array(col("v"))).agg(count(col("*")))
val plan = df.queryExecution.executedPlan
- assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined)
+ assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
val expectedAnswer =
Row(Array(0), 7178) ::
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index b5b6728..f0533f8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -37,16 +37,16 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
test("range/filter should be combined") {
val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
val plan = df.queryExecution.executedPlan
- assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined)
+ assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
assert(df.collect() === Array(Row(2)))
}
test("HashAggregate should be included in WholeStageCodegen") {
val df = spark.range(10).groupBy().agg(max(col("id")), avg(col("id")))
val plan = df.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]))
assert(df.collect() === Array(Row(9, 4.5)))
}
@@ -54,9 +54,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val df = spark.range(10).agg(max(col("id")), avg(col("id")))
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
val plan = df.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]).isDefined)
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
assert(df.collect() === Array(Row(9, 4.5)))
}
}
@@ -70,22 +70,22 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
// Array - explode
var expDF = df.select($"name", explode($"knownLanguages"), $"properties")
var plan = expDF.queryExecution.executedPlan
- assert(plan.find {
+ assert(plan.exists {
case stage: WholeStageCodegenExec =>
- stage.find(_.isInstanceOf[GenerateExec]).isDefined
+ stage.exists(_.isInstanceOf[GenerateExec])
case _ => !codegenEnabled.toBoolean
- }.isDefined)
+ })
checkAnswer(expDF, Array(Row("James", "Java", Map("hair" -> "black", "eye"
-> "brown")),
Row("James", "Scala", Map("hair" -> "black", "eye" -> "brown"))))
// Map - explode
expDF = df.select($"name", $"knownLanguages", explode($"properties"))
plan = expDF.queryExecution.executedPlan
- assert(plan.find {
+ assert(plan.exists {
case stage: WholeStageCodegenExec =>
- stage.find(_.isInstanceOf[GenerateExec]).isDefined
+ stage.exists(_.isInstanceOf[GenerateExec])
case _ => !codegenEnabled.toBoolean
- }.isDefined)
+ })
checkAnswer(expDF,
Array(Row("James", List("Java", "Scala"), "hair", "black"),
Row("James", List("Java", "Scala"), "eye", "brown")))
@@ -93,33 +93,33 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
// Array - posexplode
expDF = df.select($"name", posexplode($"knownLanguages"))
plan = expDF.queryExecution.executedPlan
- assert(plan.find {
+ assert(plan.exists {
case stage: WholeStageCodegenExec =>
- stage.find(_.isInstanceOf[GenerateExec]).isDefined
+ stage.exists(_.isInstanceOf[GenerateExec])
case _ => !codegenEnabled.toBoolean
- }.isDefined)
+ })
checkAnswer(expDF,
Array(Row("James", 0, "Java"), Row("James", 1, "Scala")))
// Map - posexplode
expDF = df.select($"name", posexplode($"properties"))
plan = expDF.queryExecution.executedPlan
- assert(plan.find {
+ assert(plan.exists {
case stage: WholeStageCodegenExec =>
- stage.find(_.isInstanceOf[GenerateExec]).isDefined
+ stage.exists(_.isInstanceOf[GenerateExec])
case _ => !codegenEnabled.toBoolean
- }.isDefined)
+ })
checkAnswer(expDF,
Array(Row("James", 0, "hair", "black"), Row("James", 1, "eye", "brown")))
// Array - explode , selecting all columns
expDF = df.select($"*", explode($"knownLanguages"))
plan = expDF.queryExecution.executedPlan
- assert(plan.find {
+ assert(plan.exists {
case stage: WholeStageCodegenExec =>
- stage.find(_.isInstanceOf[GenerateExec]).isDefined
+ stage.exists(_.isInstanceOf[GenerateExec])
case _ => !codegenEnabled.toBoolean
- }.isDefined)
+ })
checkAnswer(expDF,
Array(Row("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" ->
"brown"), "Java"),
Row("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" ->
"brown"), "Scala")))
@@ -127,11 +127,11 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
// Map - explode, selecting all columns
expDF = df.select($"*", explode($"properties"))
plan = expDF.queryExecution.executedPlan
- assert(plan.find {
+ assert(plan.exists {
case stage: WholeStageCodegenExec =>
- stage.find(_.isInstanceOf[GenerateExec]).isDefined
+ stage.exists(_.isInstanceOf[GenerateExec])
case _ => !codegenEnabled.toBoolean
- }.isDefined)
+ })
checkAnswer(expDF,
Array(
Row("James", List("Java", "Scala"),
@@ -143,9 +143,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
test("HashAggregate with grouping keys should be included in
WholeStageCodegen") {
val df = spark.range(3).groupBy(col("id") * 2).count().orderBy(col("id") *
2)
val plan = df.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]))
assert(df.collect() === Array(Row(0, 1), Row(2, 1), Row(4, 1)))
}
@@ -154,9 +154,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val schema = new StructType().add("k", IntegerType).add("v", StringType)
val smallDF = spark.createDataFrame(rdd, schema)
val df = spark.range(10).join(broadcast(smallDF), col("k") === col("id"))
- assert(df.queryExecution.executedPlan.find(p =>
+ assert(df.queryExecution.executedPlan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]))
assert(df.collect() === Array(Row(1, 1, "1"), Row(1, 1, "1"), Row(2, 2,
"2")))
}
@@ -434,9 +434,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
test("Sort should be included in WholeStageCodegen") {
val df = spark.range(3, 0, -1).toDF().sort(col("id"))
val plan = df.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]).isDefined)
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
assert(df.collect() === Array(Row(1), Row(2), Row(3)))
}
@@ -445,27 +445,27 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val ds = spark.range(10).map(_.toString)
val plan = ds.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]))
assert(ds.collect() === 0.until(10).map(_.toString).toArray)
}
test("typed filter should be included in WholeStageCodegen") {
val ds = spark.range(10).filter(_ % 2 == 0)
val plan = ds.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]))
assert(ds.collect() === Array(0, 2, 4, 6, 8))
}
test("back-to-back typed filter should be included in WholeStageCodegen") {
val ds = spark.range(10).filter(_ % 2 == 0).filter(_ % 3 == 0)
val plan = ds.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
+ p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]))
assert(ds.collect() === Array(0, 6))
}
@@ -517,10 +517,10 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
.select("int")
val plan = df.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(!plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.children(0)
- .isInstanceOf[SortMergeJoinExec]).isEmpty)
+ .isInstanceOf[SortMergeJoinExec]))
assert(df.collect() === Array(Row(1), Row(2)))
}
}
@@ -639,9 +639,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val df = spark.range(100)
val join = df.join(df, "id")
val plan = join.queryExecution.executedPlan
- assert(plan.find(p =>
+ assert(!plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
- p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0).isEmpty,
+ p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0),
"codegen stage IDs should be preserved through ReuseExchange")
checkAnswer(join, df.toDF)
}
@@ -740,11 +740,11 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
// HashAggregateExec supports WholeStageCodegen and it's the parent of
// LocalTableScanExec so LocalTableScanExec should be within a
WholeStageCodegen domain.
assert(
- executedPlan.find {
+ executedPlan.exists {
case WholeStageCodegenExec(
HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
case _ => false
- }.isDefined,
+ },
"LocalTableScanExec should be within a WholeStageCodegen domain.")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
index 849c413..787fdc7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
@@ -44,7 +44,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
codegenBenchmark("Join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -55,7 +55,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k"))
codegenBenchmark("Join w long duplicated", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -70,7 +70,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val df = spark.range(N).join(dim2,
(col("id") % M).cast(IntegerType) === col("k1")
&& (col("id") % M).cast(IntegerType) === col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -84,7 +84,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
codegenBenchmark("Join w 2 longs", N) {
val df = spark.range(N).join(dim3,
(col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -98,7 +98,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
codegenBenchmark("Join w 2 longs duplicated", N) {
val df = spark.range(N).join(dim4,
(col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) ===
col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -109,7 +109,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
codegenBenchmark("outer join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "left")
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -120,7 +120,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
codegenBenchmark("semi join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"),
"leftsemi")
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
df.noop()
}
}
@@ -131,7 +131,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[SortMergeJoinExec]))
df.noop()
}
}
@@ -144,7 +144,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val df2 = spark.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
val df = df1.join(df2, col("k1") === col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[SortMergeJoinExec]))
df.noop()
}
}
@@ -159,7 +159,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val df1 = spark.range(N).selectExpr(s"id as k1")
val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
-
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[ShuffledHashJoinExec]))
df.noop()
}
}
@@ -172,8 +172,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as
string) as v"))
codegenBenchmark("broadcast nested loop join", N) {
val df = spark.range(N).join(dim)
- assert(df.queryExecution.sparkPlan.find(
- _.isInstanceOf[BroadcastNestedLoopJoinExec]).isDefined)
+
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastNestedLoopJoinExec]))
df.noop()
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 132b8f9..0897ad2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1353,12 +1353,12 @@ abstract class JsonSuite
}
test("Dataset toJSON doesn't construct rdd") {
- val containsRDD = spark.emptyDataFrame.toJSON.queryExecution.logical.find {
+ val containsRDDExists =
spark.emptyDataFrame.toJSON.queryExecution.logical.exists {
case ExternalRDD(_, _) => true
case _ => false
}
- assert(containsRDD.isEmpty, "Expected logical plan of toJSON to not
contain an RDD")
+ assert(!containsRDDExists, "Expected logical plan of toJSON to not contain
an RDD")
}
test("JSONRelation equality test") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 280a880..7f3809d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -734,10 +734,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with
SharedSparkSession {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key ->
"true") {
val readDf = spark.read.orc(path)
- val vectorizationEnabled = readDf.queryExecution.executedPlan.find {
+ val vectorizationEnabled = readDf.queryExecution.executedPlan.exists {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
scan.supportsColumnar
case _ => false
- }.isDefined
+ }
assert(vectorizationEnabled)
checkAnswer(readDf, df)
}
@@ -756,10 +756,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with
SharedSparkSession {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key ->
"true") {
val readDf = spark.read.orc(path)
- val vectorizationEnabled = readDf.queryExecution.executedPlan.find {
+ val vectorizationEnabled = readDf.queryExecution.executedPlan.exists {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
scan.supportsColumnar
case _ => false
- }.isDefined
+ }
assert(vectorizationEnabled)
checkAnswer(readDf, df)
}
@@ -783,10 +783,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with
SharedSparkSession {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
-> "true",
SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) {
val scanPlan = spark.read.orc(path).queryExecution.executedPlan
- assert(scanPlan.find {
+ assert(scanPlan.exists {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
scan.supportsColumnar
case _ => false
- }.isDefined == vectorizedEnabled)
+ } == vectorizedEnabled)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
index a0bd0fb..ce98e2e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
@@ -160,9 +160,9 @@ class ForeachBatchSinkSuite extends StreamTest {
var planAsserted = false
val writer: (Dataset[T], Long) => Unit = { case (df, _) =>
- assert(df.queryExecution.executedPlan.find { p =>
+ assert(!df.queryExecution.executedPlan.exists { p =>
p.isInstanceOf[SerializeFromObjectExec]
- }.isEmpty, "Untyped Dataset should not introduce serialization on
object!")
+ }, "Untyped Dataset should not introduce serialization on object!")
planAsserted = true
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index f4de713..18039db 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -463,18 +463,18 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
// check existence of shuffle
assert(
-
joinOperator.left.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined ==
shuffleLeft,
+ joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleLeft,
s"expected shuffle in plan to be $shuffleLeft but
found\n${joinOperator.left}")
assert(
-
joinOperator.right.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined ==
shuffleRight,
+ joinOperator.right.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleRight,
s"expected shuffle in plan to be $shuffleRight but
found\n${joinOperator.right}")
// check existence of sort
assert(
- joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined ==
sortLeft,
+ joinOperator.left.exists(_.isInstanceOf[SortExec]) == sortLeft,
s"expected sort in the left child to be $sortLeft but
found\n${joinOperator.left}")
assert(
- joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined ==
sortRight,
+ joinOperator.right.exists(_.isInstanceOf[SortExec]) == sortRight,
s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
// check the output partitioning
@@ -678,7 +678,7 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
df1.groupBy("i", "j").agg(max("k")).sort("i", "j"))
assert(
-
aggregated.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty)
+
!aggregated.queryExecution.executedPlan.exists(_.isInstanceOf[ShuffleExchangeExec]))
}
}
@@ -719,7 +719,7 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
df1.groupBy("i", "j").agg(max("k")).sort("i", "j"))
assert(
-
aggregated.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty)
+
!aggregated.queryExecution.executedPlan.exists(_.isInstanceOf[ShuffleExchangeExec]))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]