This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0be132c [SPARK-38124][SS][FOLLOWUP] Document the current challenge on
fixing distribution of stateful operator
0be132c is described below
commit 0be132c128e80bc9d866001a64cb3f6331c85b1e
Author: Jungtaek Lim <[email protected]>
AuthorDate: Tue Feb 15 11:47:42 2022 +0900
[SPARK-38124][SS][FOLLOWUP] Document the current challenge on fixing
distribution of stateful operator
### What changes were proposed in this pull request?
This PR proposes to add the context of current challenge on fixing
distribution of stateful operator, even the distribution is a sort of "broken"
now.
This PR addresses the review comment
https://github.com/apache/spark/pull/35419#discussion_r801343068
### Why are the changes needed?
In SPARK-38124 we figured out the existing long-standing problem in
stateful operator, but it is not easy to fix since the fix may break the
existing query if the fix is not carefully designed. Anyone should also be
pretty much careful when touching the required distribution. We want to
document this explicitly to help others to be careful whenever someone is
around the codebase.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Code comment only changes.
Closes #35512 from HeartSaVioR/SPARK-38124-followup.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/catalyst/plans/physical/partitioning.scala | 8 ++++++++
.../streaming/FlatMapGroupsWithStateExec.scala | 3 +++
.../sql/execution/streaming/statefulOperators.scala | 18 +++++++++++++++++-
3 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 4418d32..5342c8e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -101,6 +101,14 @@ case class ClusteredDistribution(
* Since this distribution relies on [[HashPartitioning]] on the physical
partitioning of the
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
* [[PartitioningCollection]]) can satisfy this distribution.
+ *
+ * NOTE: This is applied only to stream-stream join as of now. For other
stateful operators, we
+ * have been using ClusteredDistribution, which could construct the physical
partitioning of the
+ * state in different way (ClusteredDistribution requires relaxed condition
and multiple
+ * partitionings can satisfy the requirement.) We need to construct the way to
fix this with
+ * minimizing possibility to break the existing checkpoints.
+ *
+ * TODO(SPARK-38204): address the issue explained in above note.
*/
case class StatefulOpClusteredDistribution(
expressions: Seq[Expression],
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index a00a622..93ed591 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec(
* to have the same grouping so that the data are co-lacated on the same
task.
*/
override def requiredChildDistribution: Seq[Distribution] = {
+ // NOTE: Please read through the NOTE on the classdoc of
StatefulOpClusteredDistribution
+ // before making any changes.
+ // TODO(SPARK-38204)
ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions))
::
ClusteredDistribution(initialStateGroupAttrs,
stateInfo.map(_.numPartitions)) ::
Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 3431823..3ab2ad4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -334,6 +334,9 @@ case class StateStoreRestoreExec(
override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] = {
+ // NOTE: Please read through the NOTE on the classdoc of
StatefulOpClusteredDistribution
+ // before making any changes.
+ // TODO(SPARK-38204)
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
@@ -493,6 +496,9 @@ case class StateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] = {
+ // NOTE: Please read through the NOTE on the classdoc of
StatefulOpClusteredDistribution
+ // before making any changes.
+ // TODO(SPARK-38204)
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
@@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec(
}
override def requiredChildDistribution: Seq[Distribution] = {
+ // NOTE: Please read through the NOTE on the classdoc of
StatefulOpClusteredDistribution
+ // before making any changes.
+ // TODO(SPARK-38204)
ClusteredDistribution(keyWithoutSessionExpressions,
stateInfo.map(_.numPartitions)) :: Nil
}
@@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] = {
+ // NOTE: Please read through the NOTE on the classdoc of
StatefulOpClusteredDistribution
+ // before making any changes.
+ // TODO(SPARK-38204)
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) ::
Nil
}
@@ -741,8 +753,12 @@ case class StreamingDeduplicateExec(
extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
/** Distribute by grouping attributes */
- override def requiredChildDistribution: Seq[Distribution] =
+ override def requiredChildDistribution: Seq[Distribution] = {
+ // NOTE: Please read through the NOTE on the classdoc of
StatefulOpClusteredDistribution
+ // before making any changes.
+ // TODO(SPARK-38204)
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) ::
Nil
+ }
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]