This is an automated email from the ASF dual-hosted git repository.
HeartSaVioR pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6d069efc1f72 [SPARK-57001][SS] Hoist `isStateful` /
`containsStatefulOperator` onto `LogicalPlan`
6d069efc1f72 is described below
commit 6d069efc1f7287c4d24815ce0b78e745df3c7b18
Author: Jungtaek Lim <[email protected]>
AuthorDate: Tue May 26 09:01:47 2026 +0900
[SPARK-57001][SS] Hoist `isStateful` / `containsStatefulOperator` onto
`LogicalPlan`
### What changes were proposed in this pull request?
Introduce two new methods on `LogicalPlan`:
- `def isStateful: Boolean = false` -- per-operator declaration of whether
the node is a streaming stateful operator (kept across microbatches).
- `def containsStatefulOperator: Boolean` -- subtree-level check, memoized.
Override `isStateful` on the operators that are streaming stateful:
`Aggregate`, `Join` (stream-stream), `GlobalLimit`, `Distinct`, `Deduplicate`,
`DeduplicateWithinWatermark`, `FlatMapGroupsWithState`,
`FlatMapGroupsInPandasWithState`, `TransformWithState`,
`TransformWithStateInPySpark`.
### Why are the changes needed?
This will be used as a convenient utility for future works. Currently we
ask each rule to re-derive the stateful-operator check via pattern matching.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
N/A.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude 4.6 Opus
Closes #56057 from HeartSaVioR/hoist-isStateful-logicalplan.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 11 +++++++++++
.../sql/catalyst/plans/logical/basicLogicalOperators.scala | 11 +++++++++++
.../org/apache/spark/sql/catalyst/plans/logical/object.scala | 2 ++
.../sql/catalyst/plans/logical/pythonLogicalOperators.scala | 2 ++
4 files changed, 26 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index c236f7cf08e8..f0020688eceb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -80,6 +80,17 @@ abstract class LogicalPlan
def isStreaming: Boolean = _isStreaming
private[this] lazy val _isStreaming = children.exists(_.isStreaming)
+ /**
+ * Whether this node is a stateful operator -- i.e. one that becomes an
operator
+ * that leverage state store to maintain state across microbatches.
+ */
+ def isStateful: Boolean = false
+
+ /** Whether this plan or any of its descendants is a stateful operator
([[isStateful]]). */
+ def containsStatefulOperator: Boolean = _containsStatefulOperator
+ private[this] lazy val _containsStatefulOperator =
+ isStateful || children.exists(_.containsStatefulOperator)
+
override def verboseStringWithSuffix(maxFields: Int): String = {
super.verboseString(maxFields) + statsCache.map(", " +
_.toString).getOrElse("")
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a27d9e526974..a7ad11848c3f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -827,6 +827,8 @@ case class Join(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft,
right = newRight)
+
+ override def isStateful: Boolean = left.isStreaming && right.isStreaming
}
/**
@@ -1243,6 +1245,8 @@ case class Aggregate(
override protected def withNewChildInternal(newChild: LogicalPlan):
Aggregate =
copy(child = newChild)
+ override def isStateful: Boolean = child.isStreaming
+
// Whether this Aggregate operator is group only. For example: SELECT a, a
FROM t GROUP BY a
private[sql] def groupOnly: Boolean = {
// aggregateExpressions can be empty through Dateset.agg,
@@ -1757,6 +1761,8 @@ case class GlobalLimit(limitExpr: Expression, child:
LogicalPlan) extends UnaryN
override protected def withNewChildInternal(newChild: LogicalPlan):
GlobalLimit =
copy(child = newChild)
+
+ override def isStateful: Boolean = child.isStreaming
}
/**
@@ -2002,6 +2008,9 @@ case class Distinct(child: LogicalPlan) extends UnaryNode
{
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Distinct
=
copy(child = newChild)
+ // Distinct is rewritten to Aggregate by ReplaceDistinctWithAggregate, hence
potentially
+ // stateful (same criteria as Aggregate).
+ override def isStateful: Boolean = child.isStreaming
}
/**
@@ -2169,6 +2178,7 @@ case class Deduplicate(
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan):
Deduplicate =
copy(child = newChild)
+ override def isStateful: Boolean = child.isStreaming
}
case class DeduplicateWithinWatermark(keys: Seq[Attribute], child:
LogicalPlan) extends UnaryNode {
@@ -2180,6 +2190,7 @@ case class DeduplicateWithinWatermark(keys:
Seq[Attribute], child: LogicalPlan)
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan):
DeduplicateWithinWatermark =
copy(child = newChild)
+ override def isStateful: Boolean = child.isStreaming
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 07423b612c30..0c6f59073559 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -567,6 +567,7 @@ case class FlatMapGroupsWithState(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): FlatMapGroupsWithState =
copy(child = newLeft, initialState = newRight)
+ override def isStateful: Boolean = child.isStreaming
}
object TransformWithState {
@@ -655,6 +656,7 @@ case class TransformWithState(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithState =
copy(child = newLeft, initialState = newRight)
+ override def isStateful: Boolean = child.isStreaming
}
/** Factory for constructing new `FlatMapGroupsInR` nodes. */
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index db22a0781c0e..56dc2f6de043 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -165,6 +165,7 @@ case class FlatMapGroupsInPandasWithState(
override protected def withNewChildInternal(
newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child =
newChild)
+ override def isStateful: Boolean = child.isStreaming
}
/**
@@ -215,6 +216,7 @@ case class TransformWithStateInPySpark(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan):
TransformWithStateInPySpark =
copy(child = newLeft, initialState = newRight)
+ override def isStateful: Boolean = child.isStreaming
def leftAttributes: Seq[Attribute] = {
assert(resolved, "This method is expected to be called after resolution.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]