This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 216b972  [SPARK-38360][SQL][SS][PYTHON] Introduce a `exists` function 
for `TreeNode` to eliminate duplicate code patterns
216b972 is described below

commit 216b97220d6a6830c8e99c385ae7a06e805a5ae8
Author: yangjie01 <[email protected]>
AuthorDate: Thu Mar 10 21:05:20 2022 +0800

    [SPARK-38360][SQL][SS][PYTHON] Introduce a `exists` function for `TreeNode` 
to eliminate duplicate code patterns
    
    ### What changes were proposed in this pull request?
    There are many duplicate code patterns in Spark code:
    
    ```scala
    treeNode.find(condition).isDefined
    treeNode.find(condition).nonEmpty
    treeNode.find(condition).isEmpty
    ```
    
    This pr Introduce a `exists` function for `TreeNode` to simplify them, 
after this pr:
    
    - `treeNode.find(condition).isDefined ` and 
`treeNode.find(condition).nonEmpty ` -> `treeNode.exists(condition)`
    - `treeNode.find(condition).isEmpty` -> `!treeNode.exists(condition)`
    
    ### Why are the changes needed?
    Code simplification
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA and add a new UT for new function.
    
    Closes #35694 from LuciferYang/treenode-exists.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 30 ++++----
 .../sql/catalyst/analysis/CTESubstitution.scala    |  4 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  4 +-
 .../expressions/EquivalentExpressions.scala        |  2 +-
 .../expressions/aggregate/interfaces.scala         |  2 +-
 .../spark/sql/catalyst/expressions/subquery.scala  | 20 +++---
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  4 +-
 .../spark/sql/catalyst/optimizer/InlineCTE.scala   |  2 +-
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  6 +-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  7 +-
 .../optimizer/RemoveRedundantAggregates.scala      |  4 +-
 .../optimizer/ReplaceExceptWithFilter.scala        |  4 +-
 .../spark/sql/catalyst/optimizer/joins.scala       |  6 +-
 .../spark/sql/catalyst/optimizer/subquery.scala    |  2 +-
 .../plans/logical/basicLogicalOperators.scala      |  2 +-
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 10 +++
 .../catalyst/analysis/AnsiTypeCoercionSuite.scala  |  2 +-
 .../sql/catalyst/analysis/TypeCoercionSuite.scala  |  2 +-
 .../optimizer/DecorrelateInnerQuerySuite.scala     |  2 +-
 .../plans/logical/AnalysisHelperSuite.scala        |  2 +-
 .../spark/sql/catalyst/trees/TreeNodeSuite.scala   | 38 ++++++++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  4 +-
 .../apache/spark/sql/execution/CacheManager.scala  |  8 +--
 .../spark/sql/execution/DataSourceScanExec.scala   |  2 +-
 .../sql/execution/WholeStageCodegenExec.scala      |  2 +-
 .../sql/execution/adaptive/AQEOptimizer.scala      |  2 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  4 +-
 .../adaptive/InsertAdaptiveSparkPlan.scala         |  8 +--
 .../analysis/DetectAmbiguousSelfJoin.scala         |  2 +-
 .../bucketing/DisableUnnecessaryBucketedScan.scala |  4 +-
 .../dynamicpruning/PartitionPruning.scala          |  4 +-
 .../dynamicpruning/PlanDynamicPruningFilters.scala |  4 +-
 .../execution/python/AggregateInPandasExec.scala   |  2 +-
 .../sql/execution/python/EvalPythonExec.scala      |  2 +-
 .../sql/execution/python/ExtractPythonUDFs.scala   | 10 +--
 .../sql/execution/python/WindowInPandasExec.scala  |  2 +-
 .../org/apache/spark/sql/execution/subquery.scala  |  4 +-
 .../org/apache/spark/sql/CTEInlineSuite.scala      |  8 +--
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  |  8 +--
 .../org/apache/spark/sql/DatasetCacheSuite.scala   |  2 +-
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 18 ++---
 .../scala/org/apache/spark/sql/JoinSuite.scala     |  4 +-
 ...laceNullWithFalseInPredicateEndToEndSuite.scala |  8 +--
 .../connector/FileDataSourceV2FallBackSuite.scala  |  2 +-
 .../DeprecatedWholeStageCodegenSuite.scala         |  4 +-
 .../apache/spark/sql/execution/PlannerSuite.scala  |  6 +-
 .../WholeStageCodegenSparkSubmitSuite.scala        |  2 +-
 .../sql/execution/WholeStageCodegenSuite.scala     | 82 +++++++++++-----------
 .../sql/execution/benchmark/JoinBenchmark.scala    | 23 +++---
 .../sql/execution/datasources/json/JsonSuite.scala |  4 +-
 .../execution/datasources/orc/OrcQuerySuite.scala  | 12 ++--
 .../streaming/sources/ForeachBatchSinkSuite.scala  |  4 +-
 .../spark/sql/sources/BucketedReadSuite.scala      | 12 ++--
 54 files changed, 234 insertions(+), 188 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5037af1..db71f0f 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1416,10 +1416,10 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
     testStream(kafka)(
       makeSureGetOffsetCalled,
       AssertOnQuery { query =>
-        query.logicalPlan.find {
+        query.logicalPlan.exists {
           case r: StreamingDataSourceV2Relation => 
r.stream.isInstanceOf[KafkaMicroBatchStream]
           case _ => false
-        }.isDefined
+        }
       }
     )
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 27e8ed2..21454be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -509,7 +509,7 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
 
     private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
-      exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)
+      exprs.exists(_.exists(_.isInstanceOf[UnresolvedAlias]))
 
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
       _.containsPattern(UNRESOLVED_ALIAS), ruleId) {
@@ -616,7 +616,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       val aggsBuffer = ArrayBuffer[Expression]()
       // Returns whether the expression belongs to any expressions in 
`aggsBuffer` or not.
       def isPartOfAggregation(e: Expression): Boolean = {
-        aggsBuffer.exists(a => a.find(_ eq e).isDefined)
+        aggsBuffer.exists(a => a.exists(_ eq e))
       }
       replaceGroupingFunc(agg, groupByExprs, gid).transformDown {
         // AggregateExpression should be computed on the unmodified value of 
its argument
@@ -966,14 +966,14 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
 
     private def hasMetadataCol(plan: LogicalPlan): Boolean = {
-      plan.expressions.exists(_.find {
+      plan.expressions.exists(_.exists {
         case a: Attribute =>
           // If an attribute is resolved before being labeled as metadata
           // (i.e. from the originating Dataset), we check with expression ID
           a.isMetadataCol ||
             plan.children.exists(c => c.metadataOutput.exists(_.exprId == 
a.exprId))
         case _ => false
-      }.isDefined)
+      })
     }
 
     private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
@@ -1674,7 +1674,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
   }
 
   private def containsDeserializer(exprs: Seq[Expression]): Boolean = {
-    exprs.exists(_.find(_.isInstanceOf[UnresolvedDeserializer]).isDefined)
+    exprs.exists(_.exists(_.isInstanceOf[UnresolvedDeserializer]))
   }
 
   // support CURRENT_DATE, CURRENT_TIMESTAMP, and grouping__id
@@ -1869,7 +1869,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
         withPosition(ordinal) {
           if (index > 0 && index <= aggs.size) {
             val ordinalExpr = aggs(index - 1)
-            if 
(ordinalExpr.find(_.isInstanceOf[AggregateExpression]).nonEmpty) {
+            if (ordinalExpr.exists(_.isInstanceOf[AggregateExpression])) {
               throw 
QueryCompilationErrors.groupByPositionRefersToAggregateFunctionError(
                 index, ordinalExpr)
             } else {
@@ -2687,7 +2687,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
    */
   object ExtractGenerator extends Rule[LogicalPlan] {
     private def hasGenerator(expr: Expression): Boolean = {
-      expr.find(_.isInstanceOf[Generator]).isDefined
+      expr.exists(_.isInstanceOf[Generator])
     }
 
     private def hasNestedGenerator(expr: NamedExpression): Boolean = {
@@ -2697,10 +2697,10 @@ class Analyzer(override val catalogManager: 
CatalogManager)
         case go: GeneratorOuter =>
           hasInnerGenerator(go.child)
         case _ =>
-          g.children.exists { _.find {
+          g.children.exists { _.exists {
             case _: Generator => true
             case _ => false
-          }.isDefined }
+          } }
       }
       trimNonTopLevelAliases(expr) match {
         case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g)
@@ -2711,12 +2711,12 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
 
     private def hasAggFunctionInGenerator(ne: Seq[NamedExpression]): Boolean = 
{
-      ne.exists(_.find {
+      ne.exists(_.exists {
         case g: Generator =>
-          
g.children.exists(_.find(_.isInstanceOf[AggregateFunction]).isDefined)
+          g.children.exists(_.exists(_.isInstanceOf[AggregateFunction]))
         case _ =>
           false
-      }.nonEmpty)
+      })
     }
 
     private def trimAlias(expr: NamedExpression): Expression = expr match {
@@ -2917,10 +2917,10 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       exprs.exists(hasWindowFunction)
 
     private def hasWindowFunction(expr: Expression): Boolean = {
-      expr.find {
+      expr.exists {
         case window: WindowExpression => true
         case _ => false
-      }.isDefined
+      }
     }
 
     /**
@@ -3756,7 +3756,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
 
     private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
-      
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
+      a.expressions.exists(_.exists(_.isInstanceOf[UnresolvedFieldName]))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 2397527..c0ba359 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -51,10 +51,10 @@ object CTESubstitution extends Rule[LogicalPlan] {
     if (!plan.containsPattern(UNRESOLVED_WITH)) {
       return plan
     }
-    val isCommand = plan.find {
+    val isCommand = plan.exists {
       case _: Command | _: ParsedStatement | _: InsertIntoDir => true
       case _ => false
-    }.isDefined
+    }
     val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef]
     val (substituted, lastSubstituted) =
       
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match 
{
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7319248..c05b932 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -334,7 +334,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
             }
 
             def checkValidGroupingExprs(expr: Expression): Unit = {
-              if (expr.find(_.isInstanceOf[AggregateExpression]).isDefined) {
+              if (expr.exists(_.isInstanceOf[AggregateExpression])) {
                 failAnalysis(
                   "aggregate functions are not allowed in GROUP BY, but found 
" + expr.sql)
               }
@@ -718,7 +718,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
     // Check whether the given expressions contains the subquery expression.
     def containsExpr(expressions: Seq[Expression]): Boolean = {
-      expressions.exists(_.find(_.semanticEquals(expr)).isDefined)
+      expressions.exists(_.exists(_.semanticEquals(expr)))
     }
 
     // Validate the subquery plan.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 59e2be4..903a6fd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -194,7 +194,7 @@ class EquivalentExpressions {
       expr.isInstanceOf[LeafExpression] ||
       // `LambdaVariable` is usually used as a loop variable, which can't be 
evaluated ahead of the
       // loop. So we can't evaluate sub-expressions containing 
`LambdaVariable` at the beginning.
-      expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
+      expr.exists(_.isInstanceOf[LambdaVariable]) ||
       // `PlanExpression` wraps query plan. To compare query plans of 
`PlanExpression` on executor,
       // can cause error like NPE.
       (expr.isInstanceOf[PlanExpression[_]] && Utils.isInRunningSparkTask)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 3ba9065..f97293d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -83,7 +83,7 @@ object AggregateExpression {
   }
 
   def containsAggregate(expr: Expression): Boolean = {
-    expr.find(isAggregate).isDefined
+    expr.exists(isAggregate)
   }
 
   def isAggregate(expr: Expression): Boolean = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index d7112a2..71b36fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -88,11 +88,11 @@ object SubqueryExpression {
    * and false otherwise.
    */
   def hasInOrCorrelatedExistsSubquery(e: Expression): Boolean = {
-    e.find {
+    e.exists {
       case _: ListQuery => true
       case ex: Exists => ex.isCorrelated
       case _ => false
-    }.isDefined
+    }
   }
 
   /**
@@ -101,20 +101,20 @@ object SubqueryExpression {
    * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
    */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
-    e.find {
+    e.exists {
       case s: SubqueryExpression => s.isCorrelated
       case _ => false
-    }.isDefined
+    }
   }
 
   /**
    * Returns true when an expression contains a subquery
    */
   def hasSubquery(e: Expression): Boolean = {
-    e.find {
+    e.exists {
       case _: SubqueryExpression => true
       case _ => false
-    }.isDefined
+    }
   }
 }
 
@@ -124,7 +124,7 @@ object SubExprUtils extends PredicateHelper {
    * returns false otherwise.
    */
   def containsOuter(e: Expression): Boolean = {
-    e.find(_.isInstanceOf[OuterReference]).isDefined
+    e.exists(_.isInstanceOf[OuterReference])
   }
 
   /**
@@ -161,7 +161,7 @@ object SubExprUtils extends PredicateHelper {
    * Given a logical plan, returns TRUE if it has an outer reference and false 
otherwise.
    */
   def hasOuterReferences(plan: LogicalPlan): Boolean = {
-    plan.find(_.expressions.exists(containsOuter)).isDefined
+    plan.exists(_.expressions.exists(containsOuter))
   }
 
   /**
@@ -282,10 +282,10 @@ case class ScalarSubquery(
 
 object ScalarSubquery {
   def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
-    e.find {
+    e.exists {
       case s: ScalarSubquery => s.isCorrelated
       case _ => false
-    }.isDefined
+    }
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 5ad70f0..9a4d1a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -87,7 +87,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
    * leaf node and will not be found here.
    */
   private def containsAttribute(expression: Expression): Boolean = {
-    expression.find(_.isInstanceOf[Attribute]).isDefined
+    expression.exists(_.isInstanceOf[Attribute])
   }
 
   /**
@@ -268,7 +268,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
           // The decorrelation framework adds domain inner joins by traversing 
down the plan tree
           // recursively until it reaches a node that is not correlated with 
the outer query.
           // So the child node of a domain inner join shouldn't contain 
another domain join.
-          assert(child.find(_.isInstanceOf[DomainJoin]).isEmpty,
+          assert(!child.exists(_.isInstanceOf[DomainJoin]),
             s"Child of a domain inner join shouldn't contain another domain 
join.\n$child")
           child
         case o =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
index 1de300e..61577b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
@@ -54,7 +54,7 @@ object InlineCTE extends Rule[LogicalPlan] {
     // 2) Any `CTERelationRef` that contains `OuterReference` would have been 
inlined first.
     refCount == 1 ||
       cteDef.deterministic ||
-      
cteDef.child.find(_.expressions.exists(_.isInstanceOf[OuterReference])).isDefined
+      cteDef.child.exists(_.expressions.exists(_.isInstanceOf[OuterReference]))
   }
 
   private def buildCTEMap(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index a2ee950..4c7130e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -247,7 +247,7 @@ object NestedColumnAliasing {
     exprList.foreach { e =>
       collectRootReferenceAndExtractValue(e).foreach {
         // we can not alias the attr from lambda variable whose expr id is not 
available
-        case ev: ExtractValue if 
ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty =>
+        case ev: ExtractValue if 
!ev.exists(_.isInstanceOf[NamedLambdaVariable]) =>
           if (ev.references.size == 1) {
             nestedFieldReferences.append(ev)
           }
@@ -267,7 +267,7 @@ object NestedColumnAliasing {
         // that do should not have an alias generated as it can lead to 
pushing the aggregate down
         // into a projection.
         def containsAggregateFunction(ev: ExtractValue): Boolean =
-          ev.find(_.isInstanceOf[AggregateFunction]).isDefined
+          ev.exists(_.isInstanceOf[AggregateFunction])
 
         // Remove redundant [[ExtractValue]]s if they share the same parent 
nest field.
         // For example, when `a.b` and `a.b.c` are in project list, we only 
need to alias `a.b`.
@@ -277,7 +277,7 @@ object NestedColumnAliasing {
           // [[GetStructField]]
           case e @ (_: GetStructField | _: GetArrayStructFields) =>
             val child = e.children.head
-            nestedFields.forall(f => child.find(_.semanticEquals(f)).isEmpty)
+            nestedFields.forall(f => !child.exists(_.semanticEquals(f)))
           case _ => true
         }
           .distinct
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f7ff566..e245d14 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -52,7 +52,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
       previousPlan: LogicalPlan,
       currentPlan: LogicalPlan): Boolean = {
     !Utils.isTesting || (currentPlan.resolved &&
-      
currentPlan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty
 &&
+      
!currentPlan.exists(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty)
 &&
       LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
       DataType.equalsIgnoreNullability(previousPlan.schema, 
currentPlan.schema))
   }
@@ -1690,11 +1690,10 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
    */
   private def canPushThroughCondition(plan: LogicalPlan, condition: 
Expression): Boolean = {
     val attributes = plan.outputSet
-    val matched = condition.find {
+    !condition.exists {
       case s: SubqueryExpression => 
s.plan.outputSet.intersect(attributes).nonEmpty
       case _ => false
     }
-    matched.isEmpty
   }
 }
 
@@ -1956,7 +1955,7 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
   }
 
   private def hasUnevaluableExpr(expr: Expression): Boolean = {
-    expr.find(e => e.isInstanceOf[Unevaluable] && 
!e.isInstanceOf[AttributeReference]).isDefined
+    expr.exists(e => e.isInstanceOf[Unevaluable] && 
!e.isInstanceOf[AttributeReference])
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
index bf17791..beec90d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
@@ -52,10 +52,10 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] 
with AliasHelper {
   private def isLowerRedundant(upper: Aggregate, lower: Aggregate): Boolean = {
     val upperHasNoDuplicateSensitiveAgg = upper
       .aggregateExpressions
-      .forall(expr => expr.find {
+      .forall(expr => !expr.exists {
         case ae: AggregateExpression => isDuplicateSensitive(ae)
         case e => AggregateExpression.isAggregate(e)
-      }.isEmpty)
+      })
 
     lazy val upperRefsOnlyDeterministicNonAgg = 
upper.references.subsetOf(AttributeSet(
       lower
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 8218051..f66128d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -87,8 +87,8 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
     val rightProjectList = projectList(right)
 
     left.output.size == left.output.map(_.name).distinct.size &&
-      left.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty 
&&
-        
right.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty &&
+      !left.exists(_.expressions.exists(SubqueryExpression.hasSubquery)) &&
+        !right.exists(_.expressions.exists(SubqueryExpression.hasSubquery)) &&
           Project(leftProjectList, 
nonFilterChild(skipProject(left))).sameResult(
             Project(rightProjectList, nonFilterChild(skipProject(right))))
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index e03360d..6d683a7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -143,7 +143,7 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
     val attributes = e.references.toSeq
     val emptyRow = new GenericInternalRow(attributes.length)
     val boundE = BindReferences.bindReference(e, attributes)
-    if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false
+    if (boundE.exists(_.isInstanceOf[Unevaluable])) return false
     val v = boundE.eval(emptyRow)
     v == null || v == false
   }
@@ -195,9 +195,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 object ExtractPythonUDFFromJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
 
   private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = {
-    expr.find { e =>
+    expr.exists { e =>
       PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && 
!canEvaluate(e, j.right)
-    }.isDefined
+    }
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformUpWithPruning(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 6d6b8b7..7ef5ef5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -728,7 +728,7 @@ object OptimizeOneRowRelationSubquery extends 
Rule[LogicalPlan] {
   }
 
   private def hasCorrelatedSubquery(plan: LogicalPlan): Boolean = {
-    
plan.find(_.expressions.exists(SubqueryExpression.hasCorrelatedSubquery)).isDefined
+    plan.exists(_.expressions.exists(SubqueryExpression.hasCorrelatedSubquery))
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 5b601fb..8a6598f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -989,7 +989,7 @@ case class Aggregate(
   final override val nodePatterns : Seq[TreePattern] = Seq(AGGREGATE)
 
   override lazy val validConstraints: ExpressionSet = {
-    val nonAgg = 
aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
+    val nonAgg = 
aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))
     getAllValidConstraints(nonAgg)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 9e50be3..ac60e18 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -247,6 +247,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product with Tre
   }
 
   /**
+   * Test whether there is [[TreeNode]] satisfies the conditions specified in 
`f`.
+   * The condition is recursively applied to this node and all of its children 
(pre-order).
+   */
+  def exists(f: BaseType => Boolean): Boolean = if (f(this)) {
+    true
+  } else {
+    children.exists(_.exists(f))
+  }
+
+  /**
    * Runs the given function on this node and then recursively on [[children]].
    * @param f the function to be applied to each node in the tree.
    */
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
index 19ee6d8..1f23aeb6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala
@@ -890,7 +890,7 @@ class AnsiTypeCoercionSuite extends TypeCoercionSuiteBase {
     val wp1 = widenSetOperationTypes(union.select(p1.output.head, $"p2.v"))
     assert(wp1.isInstanceOf[Project])
     // The attribute `p1.output.head` should be replaced in the root `Project`.
-    assert(wp1.expressions.forall(_.find(_ == p1.output.head).isEmpty))
+    assert(wp1.expressions.forall(!_.exists(_ == p1.output.head)))
     val wp2 = widenSetOperationTypes(Aggregate(Nil, 
sum(p1.output.head).as("v") :: Nil, union))
     assert(wp2.isInstanceOf[Aggregate])
     assert(wp2.missingInput.isEmpty)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 63ad84e..782f3e4 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -1490,7 +1490,7 @@ class TypeCoercionSuite extends TypeCoercionSuiteBase {
     val wp1 = widenSetOperationTypes(union.select(p1.output.head, $"p2.v"))
     assert(wp1.isInstanceOf[Project])
     // The attribute `p1.output.head` should be replaced in the root `Project`.
-    assert(wp1.expressions.forall(_.find(_ == p1.output.head).isEmpty))
+    assert(wp1.expressions.forall(!_.exists(_ == p1.output.head)))
     val wp2 = widenSetOperationTypes(Aggregate(Nil, 
sum(p1.output.head).as("v") :: Nil, union))
     assert(wp2.isInstanceOf[Aggregate])
     assert(wp2.missingInput.isEmpty)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
index dc50039..c74eeea 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
@@ -37,7 +37,7 @@ class DecorrelateInnerQuerySuite extends PlanTest {
   val testRelation2 = LocalRelation(x, y, z)
 
   private def hasOuterReferences(plan: LogicalPlan): Boolean = {
-    plan.find(_.expressions.exists(SubExprUtils.containsOuter)).isDefined
+    plan.exists(_.expressions.exists(SubExprUtils.containsOuter))
   }
 
   private def check(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
index 0a3f86e..4a42645 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelperSuite.scala
@@ -41,7 +41,7 @@ class AnalysisHelperSuite extends SparkFunSuite {
   test("setAnalyze is recursive") {
     val plan = Project(Nil, LocalRelation())
     plan.setAnalyzed()
-    assert(plan.find(!_.analyzed).isEmpty)
+    assert(!plan.exists(!_.analyzed))
   }
 
   test("resolveOperator runs on operators recursively") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index b52ecb5..b6087c5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -248,6 +248,44 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
     assert(expected === actual)
   }
 
+  test("exists") {
+    val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), 
Literal(4))))
+    // Check the top node.
+    var exists = expression.exists {
+      case _: Add => true
+      case _ => false
+    }
+    assert(exists)
+
+    // Check the first children.
+    exists = expression.exists {
+      case Literal(1, IntegerType) => true
+      case _ => false
+    }
+    assert(exists)
+
+    // Check an internal node (Subtract).
+    exists = expression.exists {
+      case _: Subtract => true
+      case _ => false
+    }
+    assert(exists)
+
+    // Check a leaf node.
+    exists = expression.exists {
+      case Literal(3, IntegerType) => true
+      case _ => false
+    }
+    assert(exists)
+
+    // Check not exists.
+    exists = expression.exists {
+      case Literal(100, IntegerType) => true
+      case _ => false
+    }
+    assert(!exists)
+  }
+
   test("collectFirst") {
     val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), 
Literal(4))))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 4a921b4..62dea96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1503,10 +1503,10 @@ class Dataset[T] private[sql](
       case typedCol: TypedColumn[_, _] =>
         // Checks if a `TypedColumn` has been inserted with
         // specific input type and schema by `withInputType`.
-        val needInputType = typedCol.expr.find {
+        val needInputType = typedCol.expr.exists {
           case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => 
true
           case _ => false
-        }.isDefined
+        }
 
         if (!needInputType) {
           typedCol
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 426b233..27d6bed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -161,7 +161,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       blocking: Boolean = false): Unit = {
     val shouldRemove: LogicalPlan => Boolean =
       if (cascade) {
-        _.find(_.sameResult(plan)).isDefined
+        _.exists(_.sameResult(plan))
       } else {
         _.sameResult(plan)
       }
@@ -187,7 +187,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         //    will keep it as it is. It means the physical plan has been 
re-compiled already in the
         //    other thread.
         val cacheAlreadyLoaded = 
cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
-        cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded
+        cd.plan.exists(_.sameResult(plan)) && !cacheAlreadyLoaded
       })
     }
   }
@@ -207,7 +207,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
    * Tries to re-cache all the cache entries that refer to the given plan.
    */
   def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
-    recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined)
+    recacheByCondition(spark, _.plan.exists(_.sameResult(plan)))
   }
 
   /**
@@ -288,7 +288,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
    */
   def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): 
Unit = {
     val qualifiedPath = fs.makeQualified(resourcePath)
-    recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined)
+    recacheByCondition(spark, _.plan.exists(lookupAndRefresh(_, fs, 
qualifiedPath)))
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index db86b38..1e2fa41 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -239,7 +239,7 @@ case class FileSourceScanExec(
   }
 
   private def isDynamicPruningFilter(e: Expression): Boolean =
-    e.find(_.isInstanceOf[PlanExpression[_]]).isDefined
+    e.exists(_.isInstanceOf[PlanExpression[_]])
 
   @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
     val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index dde976c..7d36fd5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -892,7 +892,7 @@ case class CollapseCodegenStages(
 
   private def supportCodegen(plan: SparkPlan): Boolean = plan match {
     case plan: CodegenSupport if plan.supportCodegen =>
-      val willFallback = plan.expressions.exists(_.find(e => 
!supportCodegen(e)).isDefined)
+      val willFallback = plan.expressions.exists(_.exists(e => 
!supportCodegen(e)))
       // the generated code will be huge if there are too many columns
       val hasTooManyOutputFields =
         WholeStageCodegenExec.isTooManyFields(conf, plan.schema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
index 06e9c18..5533bb1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
@@ -71,7 +71,7 @@ class AQEOptimizer(conf: SQLConf) extends 
RuleExecutor[LogicalPlan] {
       previousPlan: LogicalPlan,
       currentPlan: LogicalPlan): Boolean = {
     !Utils.isTesting || (currentPlan.resolved &&
-      
currentPlan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty
 &&
+      
!currentPlan.exists(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty)
 &&
       LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
       DataType.equalsIgnoreNullability(previousPlan.schema, 
currentPlan.schema))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 3ec5aad..14b4525 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -332,7 +332,7 @@ case class AdaptiveSparkPlanExec(
     // Subqueries that don't belong to any query stage of the main query will 
execute after the
     // last UI update in `getFinalPhysicalPlan`, so we need to update UI here 
again to make sure
     // the newly generated nodes of those subqueries are updated.
-    if (!isSubquery && 
currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) {
+    if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
       getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
     }
     logOnLevel(s"Final plan: $currentPhysicalPlan")
@@ -611,7 +611,7 @@ case class AdaptiveSparkPlanExec(
       stagesToReplace: Seq[QueryStageExec]): LogicalPlan = {
     var logicalPlan = plan
     stagesToReplace.foreach {
-      case stage if currentPhysicalPlan.find(_.eq(stage)).isDefined =>
+      case stage if currentPhysicalPlan.exists(_.eq(stage)) =>
         val logicalNodeOpt = 
stage.getTagValue(TEMP_LOGICAL_PLAN_TAG).orElse(stage.logicalLink)
         assert(logicalNodeOpt.isDefined)
         val logicalNode = logicalNodeOpt.get
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 5c20845..4410f7f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -88,14 +88,14 @@ case class InsertAdaptiveSparkPlan(
   //   - The query contains sub-query.
   private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
     conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
-      plan.find {
+      plan.exists {
         case _: Exchange => true
         case p if !p.requiredChildDistribution.forall(_ == 
UnspecifiedDistribution) => true
-        case p => p.expressions.exists(_.find {
+        case p => p.expressions.exists(_.exists {
           case _: SubqueryExpression => true
           case _ => false
-        }.isDefined)
-      }.isDefined
+        })
+      }
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index 17ea93e..7e9628c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -78,7 +78,7 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
     // We always remove the special metadata from `AttributeReference` at the 
end of this rule, so
     // Dataset column reference only exists in the root node via Dataset 
transformations like
     // `Dataset#select`.
-    if (plan.find(_.isInstanceOf[Join]).isEmpty) return 
stripColumnReferenceMetadataInPlan(plan)
+    if (!plan.exists(_.isInstanceOf[Join])) return 
stripColumnReferenceMetadataInPlan(plan)
 
     val colRefAttrs = plan.expressions.flatMap(_.collect {
       case a: AttributeReference if isColumnReference(a) => a
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
index 479bc21..1eb1082 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
@@ -141,10 +141,10 @@ object DisableUnnecessaryBucketedScan extends 
Rule[SparkPlan] {
   }
 
   def apply(plan: SparkPlan): SparkPlan = {
-    lazy val hasBucketedScan = plan.find {
+    lazy val hasBucketedScan = plan.exists {
       case scan: FileSourceScanExec => scan.bucketedScan
       case _ => false
-    }.isDefined
+    }
 
     if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || 
!hasBucketedScan) {
       plan
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 48d31d8..3b5fc4a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -214,10 +214,10 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper {
    * Search a filtering predicate in a given logical plan
    */
   private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
-    plan.find {
+    plan.exists {
       case f: Filter => isLikelySelective(f.condition)
       case _ => false
-    }.isDefined
+    }
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index 9a05e39..252565f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -58,13 +58,13 @@ case class PlanDynamicPruningFilters(sparkSession: 
SparkSession)
         // Using `sparkPlan` is a little hacky as it is based on the 
assumption that this rule is
         // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
         val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty 
&&
-          plan.find {
+          plan.exists {
             case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
               left.sameResult(sparkPlan)
             case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
               right.sameResult(sparkPlan)
             case _ => false
-          }.isDefined
+          }
 
         if (canReuseExchange) {
           val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, 
sparkPlan)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 69802b1..a7f63aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -88,7 +88,7 @@ case class AggregateInPandasExec(
         (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
       case children =>
         // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+        assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
         (ChainedPythonFunctions(Seq(udf.func)), udf.children)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e4..c567a70 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -72,7 +72,7 @@ trait EvalPythonExec extends UnaryExecNode {
         (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
       case children =>
         // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+        assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
         (ChainedPythonFunctions(Seq(udf.func)), udf.children)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 407c498..a809ea0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -45,10 +45,10 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
   }
 
   private def hasPythonUdfOverAggregate(expr: Expression, agg: Aggregate): 
Boolean = {
-    expr.find {
+    expr.exists {
       e => PythonUDF.isScalarPythonUDF(e) &&
-        (e.references.isEmpty || e.find(belongAggregate(_, agg)).isDefined)
-    }.isDefined
+        (e.references.isEmpty || e.exists(belongAggregate(_, agg)))
+    }
   }
 
   private def extract(agg: Aggregate): LogicalPlan = {
@@ -90,7 +90,7 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] {
   private def hasScalarPythonUDF(e: Expression): Boolean = {
-    e.find(PythonUDF.isScalarPythonUDF).isDefined
+    e.exists(PythonUDF.isScalarPythonUDF)
   }
 
   private def extract(agg: Aggregate): LogicalPlan = {
@@ -164,7 +164,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
PredicateHelper {
   private type EvalTypeChecker = EvalType => Boolean
 
   private def hasScalarPythonUDF(e: Expression): Boolean = {
-    e.find(PythonUDF.isScalarPythonUDF).isDefined
+    e.exists(PythonUDF.isScalarPythonUDF)
   }
 
   @scala.annotation.tailrec
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index 87102cc..e73da99 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -113,7 +113,7 @@ case class WindowInPandasExec(
         (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
       case children =>
         // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+        assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
         (ChainedPythonFunctions(Seq(udf.func)), udf.children)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 8878677..afd0aba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -46,10 +46,10 @@ object ExecSubqueryExpression {
    * Returns true when an expression contains a subquery
    */
   def hasSubquery(e: Expression): Boolean = {
-    e.find {
+    e.exists {
       case _: ExecSubqueryExpression => true
       case _ => false
-    }.isDefined
+    }
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 7ee533a..dd30ff6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -42,7 +42,7 @@ abstract class CTEInlineSuiteBase
          """.stripMargin)
       checkAnswer(df, Nil)
       assert(
-        df.queryExecution.optimizedPlan.find(_.isInstanceOf[WithCTE]).nonEmpty,
+        df.queryExecution.optimizedPlan.exists(_.isInstanceOf[WithCTE]),
         "Non-deterministic With-CTE with multiple references should be not 
inlined.")
     }
   }
@@ -59,7 +59,7 @@ abstract class CTEInlineSuiteBase
          """.stripMargin)
       checkAnswer(df, Nil)
       assert(
-        df.queryExecution.optimizedPlan.find(_.isInstanceOf[WithCTE]).nonEmpty,
+        df.queryExecution.optimizedPlan.exists(_.isInstanceOf[WithCTE]),
         "Non-deterministic With-CTE with multiple references should be not 
inlined.")
     }
   }
@@ -76,10 +76,10 @@ abstract class CTEInlineSuiteBase
          """.stripMargin)
       checkAnswer(df, Row(0, 1) :: Row(1, 2) :: Nil)
       assert(
-        df.queryExecution.analyzed.find(_.isInstanceOf[WithCTE]).nonEmpty,
+        df.queryExecution.analyzed.exists(_.isInstanceOf[WithCTE]),
         "With-CTE should not be inlined in analyzed plan.")
       assert(
-        df.queryExecution.optimizedPlan.find(_.isInstanceOf[WithCTE]).isEmpty,
+        !df.queryExecution.optimizedPlan.exists(_.isInstanceOf[WithCTE]),
         "With-CTE with one reference should be inlined in optimized plan.")
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index f33de74..11b2309 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1110,16 +1110,16 @@ class DataFrameWindowFunctionsSuite extends QueryTest
 
       checkAnswer(windowed, Seq(Row("b", 4), Row(null, null), Row(null, null), 
Row(null, null)))
 
-      val shuffleByRequirement = windowed.queryExecution.executedPlan.find {
+      val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
         case w: WindowExec =>
-          w.child.find {
+          w.child.exists {
             case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
             case _ => false
-          }.nonEmpty
+          }
         case _ => false
       }
 
-      assert(shuffleByRequirement.nonEmpty, "Can't find desired shuffle node 
from the query plan")
+      assert(shuffleByRequirement, "Can't find desired shuffle node from the 
query plan")
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 009ccb9..2f4098d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -250,7 +250,7 @@ class DatasetCacheSuite extends QueryTest
       case i: InMemoryRelation => i.cacheBuilder.cachedPlan
     }
     assert(df2LimitInnerPlan.isDefined &&
-      
df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
+      !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
   }
 
   test("SPARK-27739 Save stats from optimized plan") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 3569775..6188516 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -207,10 +207,10 @@ abstract class DynamicPartitionPruningSuiteBase
         case _: ReusedExchangeExec => // reuse check ok.
         case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse 
check ok.
         case b: BroadcastExchangeLike =>
-          val hasReuse = plan.find {
+          val hasReuse = plan.exists {
             case ReusedExchangeExec(_, e) => e eq b
             case _ => false
-          }.isDefined
+          }
           assert(hasReuse, s"$s\nshould have been reused in\n$plan")
         case a: AdaptiveSparkPlanExec =>
           val broadcastQueryStage = collectFirst(a) {
@@ -234,7 +234,7 @@ abstract class DynamicPartitionPruningSuiteBase
         case r: ReusedSubqueryExec => r.child
         case o => o
       }
-      assert(subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == 
isMainQueryAdaptive)
+      assert(subquery.exists(_.isInstanceOf[AdaptiveSparkPlanExec]) == 
isMainQueryAdaptive)
     }
   }
 
@@ -344,12 +344,12 @@ abstract class DynamicPartitionPruningSuiteBase
            | )
        """.stripMargin)
 
-      val found = df.queryExecution.executedPlan.find {
+      val found = df.queryExecution.executedPlan.exists {
         case BroadcastHashJoinExec(_, _, p: ExistenceJoin, _, _, _, _, _) => 
true
         case _ => false
       }
 
-      assert(found.isEmpty)
+      assert(!found)
     }
   }
 
@@ -1560,14 +1560,14 @@ abstract class 
DynamicPartitionPruningDataSourceSuiteBase
       }
       // search dynamic pruning predicates on the executed plan
       val plan = 
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.executedPlan
-      val ret = plan.find {
+      val ret = plan.exists {
         case s: FileSourceScanExec => s.partitionFilters.exists {
           case _: DynamicPruningExpression => true
           case _ => false
         }
         case _ => false
       }
-      assert(ret.isDefined == false)
+      assert(!ret)
     }
   }
 }
@@ -1607,10 +1607,10 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
           val scanOption =
             find(plan) {
               case s: FileSourceScanExec =>
-                s.output.exists(_.find(_.argString(maxFields = 
100).contains("fid")).isDefined)
+                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
               case s: BatchScanExec =>
                 // we use f1 col for v2 tables due to schema pruning
-                s.output.exists(_.find(_.argString(maxFields = 
100).contains("f1")).isDefined)
+                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
               case _ => false
             }
           assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index ec6c863..4a8421a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1074,8 +1074,8 @@ class JoinSuite extends QueryTest with SharedSparkSession 
with AdaptiveSparkPlan
     val df = left.crossJoin(right).where(pythonTestUDF(left("a")) === 
right.col("c"))
 
     // Before optimization, there is a logical Filter operator.
-    val filterInAnalysis = 
df.queryExecution.analyzed.find(_.isInstanceOf[Filter])
-    assert(filterInAnalysis.isDefined)
+    val filterInAnalysis = 
df.queryExecution.analyzed.exists(_.isInstanceOf[Filter])
+    assert(filterInAnalysis)
 
     // Filter predicate was pushdown as join condition. So there is no Filter 
exec operator.
     val filterExec = 
find(df.queryExecution.executedPlan)(_.isInstanceOf[FilterExec])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
index 739b405..8883e9b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala
@@ -59,13 +59,13 @@ class ReplaceNullWithFalseInPredicateEndToEndSuite extends 
QueryTest with Shared
       val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out")
       checkAnswer(q5, Row(1) :: Row(1) :: Nil)
       q5.queryExecution.executedPlan.foreach { p =>
-        assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty))
+        assert(p.expressions.forall(e => !e.exists(_.isInstanceOf[If])))
       }
 
       val q6 = df1.selectExpr("CASE WHEN (l > 2 AND null) THEN 3 ELSE 2 END")
       checkAnswer(q6, Row(2) :: Row(2) :: Nil)
       q6.queryExecution.executedPlan.foreach { p =>
-        assert(p.expressions.forall(e => 
e.find(_.isInstanceOf[CaseWhen]).isEmpty))
+        assert(p.expressions.forall(e => !e.exists(_.isInstanceOf[CaseWhen])))
       }
 
       checkAnswer(df1.where("IF(l > 10, false, b OR null)"), Row(1, true))
@@ -75,10 +75,10 @@ class ReplaceNullWithFalseInPredicateEndToEndSuite extends 
QueryTest with Shared
   test("SPARK-26107: Replace Literal(null, _) with FalseLiteral in 
higher-order functions") {
     def assertNoLiteralNullInPlan(df: DataFrame): Unit = {
       df.queryExecution.executedPlan.foreach { p =>
-        assert(p.expressions.forall(_.find {
+        assert(p.expressions.forall(!_.exists {
           case Literal(null, BooleanType) => true
           case _ => false
-        }.isEmpty))
+        }))
       }
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
index 5156bd4..cfc8b2c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
@@ -184,7 +184,7 @@ class FileDataSourceV2FallBackSuite extends QueryTest with 
SharedSparkSession {
             val df = spark.read.format(format).load(path.getCanonicalPath)
             checkAnswer(df, inputData.toDF())
             assert(
-              
df.queryExecution.executedPlan.find(_.isInstanceOf[FileSourceScanExec]).isDefined)
+              
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
           }
         } finally {
           spark.listenerManager.unregister(listener)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
index b27a940..635c794 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
@@ -36,9 +36,9 @@ class DeprecatedWholeStageCodegenSuite extends QueryTest
       .groupByKey(_._1).agg(typed.sum(_._2))
 
     val plan = ds.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]))
     assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index dfc1b70..c3c8959 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -221,7 +221,7 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
     val query = testData.select(Symbol("key"), Symbol("value"))
       .sort(Symbol("key")).limit(2).filter('key === 3)
     val planned = query.queryExecution.executedPlan
-    assert(planned.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
+    assert(planned.exists(_.isInstanceOf[TakeOrderedAndProjectExec]))
   }
 
   test("CollectLimit can appear in the middle of a plan when caching is used") 
{
@@ -234,11 +234,11 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
     withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1000") {
       val query0 = 
testData.select(Symbol("value")).orderBy(Symbol("key")).limit(100)
       val planned0 = query0.queryExecution.executedPlan
-      
assert(planned0.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
+      assert(planned0.exists(_.isInstanceOf[TakeOrderedAndProjectExec]))
 
       val query1 = 
testData.select(Symbol("value")).orderBy(Symbol("key")).limit(2000)
       val planned1 = query1.queryExecution.executedPlan
-      assert(planned1.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isEmpty)
+      assert(!planned1.exists(_.isInstanceOf[TakeOrderedAndProjectExec]))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
index 5e0318d..73c4e4c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
@@ -84,7 +84,7 @@ object WholeStageCodegenSparkSubmitSuite extends Assertions 
with Logging {
     val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) 
as "v")
       .groupBy(array(col("v"))).agg(count(col("*")))
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined)
+    assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
 
     val expectedAnswer =
       Row(Array(0), 7178) ::
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index b5b6728..f0533f8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -37,16 +37,16 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
   test("range/filter should be combined") {
     val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined)
+    assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
     assert(df.collect() === Array(Row(2)))
   }
 
   test("HashAggregate should be included in WholeStageCodegen") {
     val df = spark.range(10).groupBy().agg(max(col("id")), avg(col("id")))
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]))
     assert(df.collect() === Array(Row(9, 4.5)))
   }
 
@@ -54,9 +54,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     val df = spark.range(10).agg(max(col("id")), avg(col("id")))
     withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
       val plan = df.queryExecution.executedPlan
-      assert(plan.find(p =>
+      assert(plan.exists(p =>
         p.isInstanceOf[WholeStageCodegenExec] &&
-          
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]).isDefined)
+          
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
       assert(df.collect() === Array(Row(9, 4.5)))
     }
   }
@@ -70,22 +70,22 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     // Array - explode
     var expDF = df.select($"name", explode($"knownLanguages"), $"properties")
     var plan = expDF.queryExecution.executedPlan
-    assert(plan.find {
+    assert(plan.exists {
       case stage: WholeStageCodegenExec =>
-        stage.find(_.isInstanceOf[GenerateExec]).isDefined
+        stage.exists(_.isInstanceOf[GenerateExec])
       case _ => !codegenEnabled.toBoolean
-    }.isDefined)
+    })
     checkAnswer(expDF, Array(Row("James", "Java", Map("hair" -> "black", "eye" 
-> "brown")),
       Row("James", "Scala", Map("hair" -> "black", "eye" -> "brown"))))
 
     // Map - explode
     expDF = df.select($"name", $"knownLanguages", explode($"properties"))
     plan = expDF.queryExecution.executedPlan
-    assert(plan.find {
+    assert(plan.exists {
       case stage: WholeStageCodegenExec =>
-        stage.find(_.isInstanceOf[GenerateExec]).isDefined
+        stage.exists(_.isInstanceOf[GenerateExec])
       case _ => !codegenEnabled.toBoolean
-    }.isDefined)
+    })
     checkAnswer(expDF,
       Array(Row("James", List("Java", "Scala"), "hair", "black"),
         Row("James", List("Java", "Scala"), "eye", "brown")))
@@ -93,33 +93,33 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     // Array - posexplode
     expDF = df.select($"name", posexplode($"knownLanguages"))
     plan = expDF.queryExecution.executedPlan
-    assert(plan.find {
+    assert(plan.exists {
       case stage: WholeStageCodegenExec =>
-        stage.find(_.isInstanceOf[GenerateExec]).isDefined
+        stage.exists(_.isInstanceOf[GenerateExec])
       case _ => !codegenEnabled.toBoolean
-    }.isDefined)
+    })
     checkAnswer(expDF,
       Array(Row("James", 0, "Java"), Row("James", 1, "Scala")))
 
     // Map - posexplode
     expDF = df.select($"name", posexplode($"properties"))
     plan = expDF.queryExecution.executedPlan
-    assert(plan.find {
+    assert(plan.exists {
       case stage: WholeStageCodegenExec =>
-        stage.find(_.isInstanceOf[GenerateExec]).isDefined
+        stage.exists(_.isInstanceOf[GenerateExec])
       case _ => !codegenEnabled.toBoolean
-    }.isDefined)
+    })
     checkAnswer(expDF,
       Array(Row("James", 0, "hair", "black"), Row("James", 1, "eye", "brown")))
 
     // Array - explode , selecting all columns
     expDF = df.select($"*", explode($"knownLanguages"))
     plan = expDF.queryExecution.executedPlan
-    assert(plan.find {
+    assert(plan.exists {
       case stage: WholeStageCodegenExec =>
-        stage.find(_.isInstanceOf[GenerateExec]).isDefined
+        stage.exists(_.isInstanceOf[GenerateExec])
       case _ => !codegenEnabled.toBoolean
-    }.isDefined)
+    })
     checkAnswer(expDF,
       Array(Row("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" -> 
"brown"), "Java"),
         Row("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" -> 
"brown"), "Scala")))
@@ -127,11 +127,11 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     // Map - explode, selecting all columns
     expDF = df.select($"*", explode($"properties"))
     plan = expDF.queryExecution.executedPlan
-    assert(plan.find {
+    assert(plan.exists {
       case stage: WholeStageCodegenExec =>
-        stage.find(_.isInstanceOf[GenerateExec]).isDefined
+        stage.exists(_.isInstanceOf[GenerateExec])
       case _ => !codegenEnabled.toBoolean
-    }.isDefined)
+    })
     checkAnswer(expDF,
       Array(
         Row("James", List("Java", "Scala"),
@@ -143,9 +143,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
   test("HashAggregate with grouping keys should be included in 
WholeStageCodegen") {
     val df = spark.range(3).groupBy(col("id") * 2).count().orderBy(col("id") * 
2)
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]))
     assert(df.collect() === Array(Row(0, 1), Row(2, 1), Row(4, 1)))
   }
 
@@ -154,9 +154,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     val schema = new StructType().add("k", IntegerType).add("v", StringType)
     val smallDF = spark.createDataFrame(rdd, schema)
     val df = spark.range(10).join(broadcast(smallDF), col("k") === col("id"))
-    assert(df.queryExecution.executedPlan.find(p =>
+    assert(df.queryExecution.executedPlan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]))
     assert(df.collect() === Array(Row(1, 1, "1"), Row(1, 1, "1"), Row(2, 2, 
"2")))
   }
 
@@ -434,9 +434,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
   test("Sort should be included in WholeStageCodegen") {
     val df = spark.range(3, 0, -1).toDF().sort(col("id"))
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]).isDefined)
+        p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
     assert(df.collect() === Array(Row(1), Row(2), Row(3)))
   }
 
@@ -445,27 +445,27 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
 
     val ds = spark.range(10).map(_.toString)
     val plan = ds.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
+      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]))
     assert(ds.collect() === 0.until(10).map(_.toString).toArray)
   }
 
   test("typed filter should be included in WholeStageCodegen") {
     val ds = spark.range(10).filter(_ % 2 == 0)
     val plan = ds.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
+        p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]))
     assert(ds.collect() === Array(0, 2, 4, 6, 8))
   }
 
   test("back-to-back typed filter should be included in WholeStageCodegen") {
     val ds = spark.range(10).filter(_ % 2 == 0).filter(_ % 3 == 0)
     val plan = ds.queryExecution.executedPlan
-    assert(plan.find(p =>
+    assert(plan.exists(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
+      p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]))
     assert(ds.collect() === Array(0, 6))
   }
 
@@ -517,10 +517,10 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
         .select("int")
 
       val plan = df.queryExecution.executedPlan
-      assert(plan.find(p =>
+      assert(!plan.exists(p =>
         p.isInstanceOf[WholeStageCodegenExec] &&
           p.asInstanceOf[WholeStageCodegenExec].child.children(0)
-            .isInstanceOf[SortMergeJoinExec]).isEmpty)
+            .isInstanceOf[SortMergeJoinExec]))
       assert(df.collect() === Array(Row(1), Row(2)))
     }
   }
@@ -639,9 +639,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
       val df = spark.range(100)
       val join = df.join(df, "id")
       val plan = join.queryExecution.executedPlan
-      assert(plan.find(p =>
+      assert(!plan.exists(p =>
         p.isInstanceOf[WholeStageCodegenExec] &&
-          p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0).isEmpty,
+          p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0),
         "codegen stage IDs should be preserved through ReuseExchange")
       checkAnswer(join, df.toDF)
     }
@@ -740,11 +740,11 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     // HashAggregateExec supports WholeStageCodegen and it's the parent of
     // LocalTableScanExec so LocalTableScanExec should be within a 
WholeStageCodegen domain.
     assert(
-      executedPlan.find {
+      executedPlan.exists {
         case WholeStageCodegenExec(
           HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
         case _ => false
-      }.isDefined,
+      },
       "LocalTableScanExec should be within a WholeStageCodegen domain.")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
index 849c413..787fdc7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
@@ -44,7 +44,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
     codegenBenchmark("Join w long", N) {
       val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -55,7 +55,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k"))
     codegenBenchmark("Join w long duplicated", N) {
       val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -70,7 +70,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
       val df = spark.range(N).join(dim2,
         (col("id") % M).cast(IntegerType) === col("k1")
           && (col("id") % M).cast(IntegerType) === col("k2"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -84,7 +84,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     codegenBenchmark("Join w 2 longs", N) {
       val df = spark.range(N).join(dim3,
         (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -98,7 +98,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     codegenBenchmark("Join w 2 longs duplicated", N) {
       val df = spark.range(N).join(dim4,
         (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === 
col("k2"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -109,7 +109,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
     codegenBenchmark("outer join w long", N) {
       val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "left")
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -120,7 +120,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
     codegenBenchmark("semi join w long", N) {
       val df = spark.range(N).join(dim, (col("id") % M) === col("k"), 
"leftsemi")
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastHashJoinExec]))
       df.noop()
     }
   }
@@ -131,7 +131,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
       val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
       val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
       val df = df1.join(df2, col("k1") === col("k2"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[SortMergeJoinExec]))
       df.noop()
     }
   }
@@ -144,7 +144,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
       val df2 = spark.range(N)
         .selectExpr(s"(id * 15485867) % ${N*10} as k2")
       val df = df1.join(df2, col("k1") === col("k2"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[SortMergeJoinExec]))
       df.noop()
     }
   }
@@ -159,7 +159,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
         val df1 = spark.range(N).selectExpr(s"id as k1")
         val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
         val df = df1.join(df2, col("k1") === col("k2"))
-        
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
+        
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[ShuffledHashJoinExec]))
         df.noop()
       }
     }
@@ -172,8 +172,7 @@ object JoinBenchmark extends SqlBasedBenchmark {
     val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
     codegenBenchmark("broadcast nested loop join", N) {
       val df = spark.range(N).join(dim)
-      assert(df.queryExecution.sparkPlan.find(
-        _.isInstanceOf[BroadcastNestedLoopJoinExec]).isDefined)
+      
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[BroadcastNestedLoopJoinExec]))
       df.noop()
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 132b8f9..0897ad2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1353,12 +1353,12 @@ abstract class JsonSuite
   }
 
   test("Dataset toJSON doesn't construct rdd") {
-    val containsRDD = spark.emptyDataFrame.toJSON.queryExecution.logical.find {
+    val containsRDDExists = 
spark.emptyDataFrame.toJSON.queryExecution.logical.exists {
       case ExternalRDD(_, _) => true
       case _ => false
     }
 
-    assert(containsRDD.isEmpty, "Expected logical plan of toJSON to not 
contain an RDD")
+    assert(!containsRDDExists, "Expected logical plan of toJSON to not contain 
an RDD")
   }
 
   test("JSONRelation equality test") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 280a880..7f3809d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -734,10 +734,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
         val readDf = spark.read.orc(path)
-        val vectorizationEnabled = readDf.queryExecution.executedPlan.find {
+        val vectorizationEnabled = readDf.queryExecution.executedPlan.exists {
           case scan @ (_: FileSourceScanExec | _: BatchScanExec) => 
scan.supportsColumnar
           case _ => false
-        }.isDefined
+        }
         assert(vectorizationEnabled)
         checkAnswer(readDf, df)
       }
@@ -756,10 +756,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
         val readDf = spark.read.orc(path)
-        val vectorizationEnabled = readDf.queryExecution.executedPlan.find {
+        val vectorizationEnabled = readDf.queryExecution.executedPlan.exists {
           case scan @ (_: FileSourceScanExec | _: BatchScanExec) => 
scan.supportsColumnar
           case _ => false
-        }.isDefined
+        }
         assert(vectorizationEnabled)
         checkAnswer(readDf, df)
       }
@@ -783,10 +783,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with 
SharedSparkSession {
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key 
-> "true",
             SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) {
             val scanPlan = spark.read.orc(path).queryExecution.executedPlan
-            assert(scanPlan.find {
+            assert(scanPlan.exists {
               case scan @ (_: FileSourceScanExec | _: BatchScanExec) => 
scan.supportsColumnar
               case _ => false
-            }.isDefined == vectorizedEnabled)
+            } == vectorizedEnabled)
           }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
index a0bd0fb..ce98e2e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
@@ -160,9 +160,9 @@ class ForeachBatchSinkSuite extends StreamTest {
       var planAsserted = false
 
       val writer: (Dataset[T], Long) => Unit = { case (df, _) =>
-        assert(df.queryExecution.executedPlan.find { p =>
+        assert(!df.queryExecution.executedPlan.exists { p =>
           p.isInstanceOf[SerializeFromObjectExec]
-        }.isEmpty, "Untyped Dataset should not introduce serialization on 
object!")
+        }, "Untyped Dataset should not introduce serialization on object!")
         planAsserted = true
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index f4de713..18039db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -463,18 +463,18 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
 
         // check existence of shuffle
         assert(
-          
joinOperator.left.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined == 
shuffleLeft,
+          joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) == 
shuffleLeft,
           s"expected shuffle in plan to be $shuffleLeft but 
found\n${joinOperator.left}")
         assert(
-          
joinOperator.right.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined == 
shuffleRight,
+          joinOperator.right.exists(_.isInstanceOf[ShuffleExchangeExec]) == 
shuffleRight,
           s"expected shuffle in plan to be $shuffleRight but 
found\n${joinOperator.right}")
 
         // check existence of sort
         assert(
-          joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == 
sortLeft,
+          joinOperator.left.exists(_.isInstanceOf[SortExec]) == sortLeft,
           s"expected sort in the left child to be $sortLeft but 
found\n${joinOperator.left}")
         assert(
-          joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == 
sortRight,
+          joinOperator.right.exists(_.isInstanceOf[SortExec]) == sortRight,
           s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
 
         // check the output partitioning
@@ -678,7 +678,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
         df1.groupBy("i", "j").agg(max("k")).sort("i", "j"))
 
       assert(
-        
aggregated.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty)
+        
!aggregated.queryExecution.executedPlan.exists(_.isInstanceOf[ShuffleExchangeExec]))
     }
   }
 
@@ -719,7 +719,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
         df1.groupBy("i", "j").agg(max("k")).sort("i", "j"))
 
       assert(
-        
aggregated.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty)
+        
!aggregated.queryExecution.executedPlan.exists(_.isInstanceOf[ShuffleExchangeExec]))
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to