This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 8845ff7c0269 [SPARK-57001][SS] Hoist `isStateful` / 
`containsStatefulOperator` onto `LogicalPlan`
8845ff7c0269 is described below

commit 8845ff7c0269751e5a84e5462314643c11d25d38
Author: Jungtaek Lim <[email protected]>
AuthorDate: Tue May 26 09:01:47 2026 +0900

    [SPARK-57001][SS] Hoist `isStateful` / `containsStatefulOperator` onto 
`LogicalPlan`
    
    ### What changes were proposed in this pull request?
    
    Introduce two new methods on `LogicalPlan`:
    
    - `def isStateful: Boolean = false` -- per-operator declaration of whether 
the node is a streaming stateful operator (kept across microbatches).
    - `def containsStatefulOperator: Boolean` -- subtree-level check, memoized.
    
    Override `isStateful` on the operators that are streaming stateful: 
`Aggregate`, `Join` (stream-stream), `GlobalLimit`, `Distinct`, `Deduplicate`, 
`DeduplicateWithinWatermark`, `FlatMapGroupsWithState`, 
`FlatMapGroupsInPandasWithState`, `TransformWithState`, 
`TransformWithStateInPySpark`.
    
    ### Why are the changes needed?
    
    This will be used as a convenient utility for future works. Currently we 
ask each rule to re-derive the stateful-operator check via pattern matching.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. Generated-by: Claude 4.6 Opus
    
    Closes #56057 from HeartSaVioR/hoist-isStateful-logicalplan.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 11 +++++++++++
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala    | 11 +++++++++++
 .../org/apache/spark/sql/catalyst/plans/logical/object.scala  |  2 ++
 .../sql/catalyst/plans/logical/pythonLogicalOperators.scala   |  2 ++
 4 files changed, 26 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index c236f7cf08e8..f0020688eceb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -80,6 +80,17 @@ abstract class LogicalPlan
   def isStreaming: Boolean = _isStreaming
   private[this] lazy val _isStreaming = children.exists(_.isStreaming)
 
+  /**
+   * Whether this node is a stateful operator -- i.e. one that becomes an 
operator
+   * that leverage state store to maintain state across microbatches.
+   */
+  def isStateful: Boolean = false
+
+  /** Whether this plan or any of its descendants is a stateful operator 
([[isStateful]]). */
+  def containsStatefulOperator: Boolean = _containsStatefulOperator
+  private[this] lazy val _containsStatefulOperator =
+    isStateful || children.exists(_.containsStatefulOperator)
+
   override def verboseStringWithSuffix(maxFields: Int): String = {
     super.verboseString(maxFields) + statsCache.map(", " + 
_.toString).getOrElse("")
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6d37aa0f9f6b..ac0784474e2e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -827,6 +827,8 @@ case class Join(
 
   override protected def withNewChildrenInternal(
     newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft, 
right = newRight)
+
+  override def isStateful: Boolean = left.isStreaming && right.isStreaming
 }
 
 /**
@@ -1244,6 +1246,8 @@ case class Aggregate(
   override protected def withNewChildInternal(newChild: LogicalPlan): 
Aggregate =
     copy(child = newChild)
 
+  override def isStateful: Boolean = child.isStreaming
+
   // Whether this Aggregate operator is group only. For example: SELECT a, a 
FROM t GROUP BY a
   private[sql] def groupOnly: Boolean = {
     // aggregateExpressions can be empty through Dateset.agg,
@@ -1758,6 +1762,8 @@ case class GlobalLimit(limitExpr: Expression, child: 
LogicalPlan) extends UnaryN
 
   override protected def withNewChildInternal(newChild: LogicalPlan): 
GlobalLimit =
     copy(child = newChild)
+
+  override def isStateful: Boolean = child.isStreaming
 }
 
 /**
@@ -2003,6 +2009,9 @@ case class Distinct(child: LogicalPlan) extends UnaryNode 
{
   final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
   override protected def withNewChildInternal(newChild: LogicalPlan): Distinct 
=
     copy(child = newChild)
+  // Distinct is rewritten to Aggregate by ReplaceDistinctWithAggregate, hence 
potentially
+  // stateful (same criteria as Aggregate).
+  override def isStateful: Boolean = child.isStreaming
 }
 
 /**
@@ -2170,6 +2179,7 @@ case class Deduplicate(
   final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
   override protected def withNewChildInternal(newChild: LogicalPlan): 
Deduplicate =
     copy(child = newChild)
+  override def isStateful: Boolean = child.isStreaming
 }
 
 case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: 
LogicalPlan) extends UnaryNode {
@@ -2181,6 +2191,7 @@ case class DeduplicateWithinWatermark(keys: 
Seq[Attribute], child: LogicalPlan)
   final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
   override protected def withNewChildInternal(newChild: LogicalPlan): 
DeduplicateWithinWatermark =
     copy(child = newChild)
+  override def isStateful: Boolean = child.isStreaming
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 07423b612c30..0c6f59073559 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -567,6 +567,7 @@ case class FlatMapGroupsWithState(
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): FlatMapGroupsWithState =
     copy(child = newLeft, initialState = newRight)
+  override def isStateful: Boolean = child.isStreaming
 }
 
 object TransformWithState {
@@ -655,6 +656,7 @@ case class TransformWithState(
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithState =
     copy(child = newLeft, initialState = newRight)
+  override def isStateful: Boolean = child.isStreaming
 }
 
 /** Factory for constructing new `FlatMapGroupsInR` nodes. */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index db22a0781c0e..56dc2f6de043 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -165,6 +165,7 @@ case class FlatMapGroupsInPandasWithState(
 
   override protected def withNewChildInternal(
     newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = 
newChild)
+  override def isStateful: Boolean = child.isStreaming
 }
 
 /**
@@ -215,6 +216,7 @@ case class TransformWithStateInPySpark(
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): 
TransformWithStateInPySpark =
     copy(child = newLeft, initialState = newRight)
+  override def isStateful: Boolean = child.isStreaming
 
   def leftAttributes: Seq[Attribute] = {
     assert(resolved, "This method is expected to be called after resolution.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to