This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0be132c [SPARK-38124][SS][FOLLOWUP] Document the current challenge on fixing distribution of stateful operator 0be132c is described below commit 0be132c128e80bc9d866001a64cb3f6331c85b1e Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Tue Feb 15 11:47:42 2022 +0900 [SPARK-38124][SS][FOLLOWUP] Document the current challenge on fixing distribution of stateful operator ### What changes were proposed in this pull request? This PR proposes to add the context of current challenge on fixing distribution of stateful operator, even the distribution is a sort of "broken" now. This PR addresses the review comment https://github.com/apache/spark/pull/35419#discussion_r801343068 ### Why are the changes needed? In SPARK-38124 we figured out the existing long-standing problem in stateful operator, but it is not easy to fix since the fix may break the existing query if the fix is not carefully designed. Anyone should also be pretty much careful when touching the required distribution. We want to document this explicitly to help others to be careful whenever someone is around the codebase. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Code comment only changes. Closes #35512 from HeartSaVioR/SPARK-38124-followup. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/catalyst/plans/physical/partitioning.scala | 8 ++++++++ .../streaming/FlatMapGroupsWithStateExec.scala | 3 +++ .../sql/execution/streaming/statefulOperators.scala | 18 +++++++++++++++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 4418d32..5342c8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -101,6 +101,14 @@ case class ClusteredDistribution( * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the * stateful operator, only [[HashPartitioning]] (and HashPartitioning in * [[PartitioningCollection]]) can satisfy this distribution. + * + * NOTE: This is applied only to stream-stream join as of now. For other stateful operators, we + * have been using ClusteredDistribution, which could construct the physical partitioning of the + * state in different way (ClusteredDistribution requires relaxed condition and multiple + * partitionings can satisfy the requirement.) We need to construct the way to fix this with + * minimizing possibility to break the existing checkpoints. + * + * TODO(SPARK-38204): address the issue explained in above note. */ case class StatefulOpClusteredDistribution( expressions: Seq[Expression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index a00a622..93ed591 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec( * to have the same grouping so that the data are co-lacated on the same task. */ override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO(SPARK-38204) ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) :: ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 3431823..3ab2ad4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -334,6 +334,9 @@ case class StateStoreRestoreExec( override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO(SPARK-38204) if (keyExpressions.isEmpty) { AllTuples :: Nil } else { @@ -493,6 +496,9 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO(SPARK-38204) if (keyExpressions.isEmpty) { AllTuples :: Nil } else { @@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec( } override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO(SPARK-38204) ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil } @@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO(SPARK-38204) ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil } @@ -741,8 +753,12 @@ case class StreamingDeduplicateExec( extends UnaryExecNode with StateStoreWriter with WatermarkSupport { /** Distribute by grouping attributes */ - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO(SPARK-38204) ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil + } override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org