This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c80583879c [SPARK-40940] Remove Multi-stateful operator checkers for 
streaming queries
3c80583879c is described below

commit 3c80583879c59fc6ac051d5804c12b835e078059
Author: Wei Liu <[email protected]>
AuthorDate: Thu Nov 17 05:43:32 2022 +0900

    [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries
    
    ### What changes were proposed in this pull request?
    
    As a followup to [SPARK-40925], [github 
PR](https://github.com/apache/spark/pull/38405), Remove corresponding checks in 
UnsupportedOperationChecker so that customers don't have to explicitly add new 
conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to 
use the new multi-stateful operators. In other words we are enabling 
multi-stateful operators by default.
    
    As a side effect, the API of 
`checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also 
changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)`
    
    New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could 
also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested.
    
    ### Why are the changes needed?
    
    To enable new multiple-stateful operators by default. Right now users need 
to set SQL conf `unsupportedOperationCheck` to false explicitly, which also 
disables many other useful checks.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. All current running queries won't be impacted. But new queries could 
use chained stateful operators.
    
    ### How was this patch tested?
    
    Unit Tests.
    
    Closes #38503 from WweiL/SPARK-40940-multi-state-checkers.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../analysis/UnsupportedOperationChecker.scala     | 135 ++--
 .../analysis/UnsupportedOperationsSuite.scala      | 303 +++++---
 .../streaming/MultiStatefulOperatorsSuite.scala    | 765 +++++++++++----------
 3 files changed, 705 insertions(+), 498 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 84795203fd1..06581e23d58 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, CurrentDate, CurrentTimestampLike, GroupingSets, 
LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, 
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, 
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
 
 /**
  * Analyzes the presence of unsupported operations in a logical plan.
@@ -42,40 +43,97 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
-   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
-   * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
-   * print a warning message.
+   * Checks if the expression has a event time column
+   * @param exp the expression to be checked
+   * @return true if it is a event time column.
    */
-  def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
-      case Join(left, right, joinType, _, _)
-        if left.isStreaming && right.isStreaming && joinType != Inner => true
-      case f: FlatMapGroupsWithState
-        if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+    case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+    case _ => false
+  }
+
+  /**
+   * Checks if the expression contains a range comparison, in which
+   * either side of the comparison is an event-time column. This is used for 
checking
+   * stream-stream time interval join.
+   * @param e the expression to be checked
+   * @return true if there is a time-interval join.
+   */
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
+    def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
+      val exp = neq.asInstanceOf[BinaryComparison]
+      hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
     }
 
-    def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate if s.isStreaming => true
-      case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
-      case f: FlatMapGroupsWithState if f.isStreaming => true
-      case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-      case d: Deduplicate if d.isStreaming => true
+    e.exists {
+      case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | 
_: GreaterThan) =>
+        hasEventTimeColBinaryComp(neq)
       case _ => false
     }
+  }
 
-    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
+  /**
+   * This method, combined with isStatefulOperation, determines all disallowed
+   * behaviors in multiple stateful operators.
+   * Concretely, All conditions defined below cannot be followed by any 
streaming stateful
+   * operator as defined in isStatefulOperation.
+   * @param p logical plan to be checked
+   * @param outputMode query output mode
+   * @return true if it is not allowed when followed by any streaming stateful
+   * operator as defined in isStatefulOperation.
+   */
+  private def ifCannotBeFollowedByStatefulOperation(
+      p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+    case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+      left.isStreaming && right.isStreaming &&
+        otherCondition.isDefined && 
hasRangeExprAgainstEventTimeCol(otherCondition.get)
+    // FlatMapGroupsWithState configured with event time
+    case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, 
_, _, _, _)
+      if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+      if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case a: Aggregate if a.isStreaming && outputMode != 
InternalOutputModes.Append => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming
+      && outputMode != InternalOutputModes.Append => true
+    case _ => false
+  }
+
+  /**
+   * This method is only used with ifCannotBeFollowedByStatefulOperation.
+   * Here we list up stateful operators but there is an exception for 
Deduplicate:
+   * it is only counted here when it has an event time column.
+   * @param p the logical plan to be checked
+   * @return true if there is a streaming stateful operation
+   */
+  private def isStatefulOperation(p: LogicalPlan): Boolean = p match {
+    case s: Aggregate if s.isStreaming => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+    case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
+    case f: FlatMapGroupsWithState if f.isStreaming => true
+    case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+    case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => 
true
+    case _ => false
+  }
 
+  /**
+   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
+   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
+   * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
+   * print a warning message.
+   */
+  def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: 
OutputMode): Unit = {
+    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
     try {
       plan.foreach { subPlan =>
         if (isStatefulOperation(subPlan)) {
           subPlan.find { p =>
-            (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p)
+            (p ne subPlan) && ifCannotBeFollowedByStatefulOperation(p, 
outputMode)
           }.foreach { _ =>
             val errorMsg = "Detected pattern of possible 'correctness' issue " 
+
               "due to global watermark. " +
@@ -154,15 +212,7 @@ object UnsupportedOperationChecker extends Logging {
           "DataFrames/Datasets")(plan)
     }
 
-    // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
-
-    if (aggregates.size > 1) {
-      throwError(
-        "Multiple streaming aggregations are not supported with " +
-          "streaming DataFrames/Datasets")(plan)
-    }
-
     // Disallow some output mode
     outputMode match {
       case InternalOutputModes.Append if aggregates.nonEmpty =>
@@ -266,12 +316,8 @@ object UnsupportedOperationChecker extends Logging {
               " DataFrame/Dataset")
           }
           if (m.isMapGroupsWithState) {                       // check 
mapGroupsWithState
-            // allowed only in update query output mode and without aggregation
-            if (aggsInQuery.nonEmpty) {
-              throwError(
-                "mapGroupsWithState is not supported with aggregation " +
-                  "on a streaming DataFrame/Dataset")
-            } else if (outputMode != InternalOutputModes.Update) {
+            // allowed only in update query output mode
+            if (outputMode != InternalOutputModes.Update) {
               throwError(
                 "mapGroupsWithState is not supported with " +
                   s"$outputMode output mode on a streaming DataFrame/Dataset")
@@ -294,16 +340,11 @@ object UnsupportedOperationChecker extends Logging {
                 case _ =>
               }
             } else {
-              // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
-              // *groupsWithState after aggregation not allowed
+              // flatMapGroupsWithState with aggregation: update operation 
mode not allowed
               if (m.outputMode == InternalOutputModes.Update) {
                 throwError(
                   "flatMapGroupsWithState in update mode is not supported with 
" +
                     "aggregation on a streaming DataFrame/Dataset")
-              } else if (collectStreamingAggregates(m).nonEmpty) {
-                throwError(
-                  "flatMapGroupsWithState in append mode is not supported 
after " +
-                    "aggregation on a streaming DataFrame/Dataset")
               }
             }
           }
@@ -373,10 +414,6 @@ object UnsupportedOperationChecker extends Logging {
             }
           }
 
-        case d: Deduplicate if collectStreamingAggregates(d).nonEmpty =>
-          throwError("dropDuplicates is not supported after aggregation on a " 
+
-            "streaming DataFrame/Dataset")
-
         case j @ Join(left, right, joinType, condition, _) =>
           if (left.isStreaming && right.isStreaming && outputMode != 
InternalOutputModes.Append) {
             throwError("Join between two streaming DataFrames/Datasets is not 
supported" +
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index d30bcd5af5d..64c5ea3f5b1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -100,12 +100,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       Aggregate(Nil, aggExprs("d"), streamRelation), joinType = Inner),
     Update)
 
-  assertNotSupportedInStreamingPlan(
-    "aggregate - multiple streaming aggregations",
-    Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), 
streamRelation)),
-    outputMode = Update,
-    expectedMsgs = Seq("multiple streaming aggregations"))
-
   assertSupportedInStreamingPlan(
     "aggregate - streaming aggregations in update mode",
     Aggregate(Nil, aggExprs("d"), streamRelation),
@@ -233,17 +227,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false")
   }
 
-  for (outputMode <- Seq(Append, Update)) {
-    assertNotSupportedInStreamingPlan(
-      "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
-        s"on streaming relation after aggregation in $outputMode mode",
-      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
-        isMapGroupsWithState = false, null,
-        Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
-      outputMode = outputMode,
-      expectedMsgs = Seq("flatMapGroupsWithState", "after aggregation"))
-  }
-
   assertNotSupportedInStreamingPlan(
     "flatMapGroupsWithState - " +
       "flatMapGroupsWithState(Update) on streaming relation in complete mode",
@@ -315,17 +298,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
     // future.
     expectedMsgs = Seq("Complete"))
 
-  for (outputMode <- Seq(Append, Update, Complete)) {
-    assertNotSupportedInStreamingPlan(
-      "mapGroupsWithState - mapGroupsWithState on streaming relation " +
-        s"with aggregation in $outputMode mode",
-      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Update,
-        isMapGroupsWithState = true, null,
-        Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
-      outputMode = outputMode,
-      expectedMsgs = Seq("mapGroupsWithState", "with aggregation"))
-  }
-
   // multiple mapGroupsWithStates
   assertNotSupportedInStreamingPlan(
     "mapGroupsWithState - multiple mapGroupsWithStates on streaming relation 
and all are " +
@@ -369,19 +341,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
 
   // Deduplicate
   assertSupportedInStreamingPlan(
-    "Deduplicate - Deduplicate on streaming relation before aggregation",
+    "Deduplicate - Deduplicate on streaming relation before aggregation - 
append",
     Aggregate(
       Seq(attributeWithWatermark),
       aggExprs("c"),
       Deduplicate(Seq(att), streamRelation)),
     outputMode = Append)
 
-  assertNotSupportedInStreamingPlan(
-    "Deduplicate - Deduplicate on streaming relation after aggregation",
-    Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation)),
-    outputMode = Complete,
-    expectedMsgs = Seq("dropDuplicates"))
-
   assertSupportedInStreamingPlan(
     "Deduplicate - Deduplicate on batch relation inside a streaming query",
     Deduplicate(Seq(att), batchRelation),
@@ -501,51 +467,217 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
         "the nullable side and an appropriate range condition"))
   }
 
-  // stream-stream inner join doesn't emit late rows, whereas outer joins could
-  Seq((Inner, false), (LeftOuter, true), (RightOuter, true)).foreach {
-    case (joinType, expectFailure) =>
+  // multi-aggregations only supported in Append mode
+  assertPassOnGlobalWatermarkLimit(
+    "aggregate - multiple streaming aggregations - append",
+    Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), 
streamRelation)),
+    outputMode = Append)
+
+  assertFailOnGlobalWatermarkLimit(
+    "aggregate - multiple streaming aggregations - update",
+    Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), 
streamRelation)),
+    outputMode = Update)
+
+  assertFailOnGlobalWatermarkLimit(
+    "aggregate - multiple streaming aggregations - complete",
+    Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), 
streamRelation)),
+    outputMode = Complete)
+
+  assertPassOnGlobalWatermarkLimit(
+    "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
+      s"on streaming relation after aggregation in Append mode",
+    TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, 
Append,
+      isMapGroupsWithState = false, null,
+      Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+    outputMode = Append)
+
+  // Aggregation not in Append mode followed by any stateful operators is 
disallowed
+  assertFailOnGlobalWatermarkLimit(
+    "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
+      s"on streaming relation after aggregation in Update mode",
+    TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, 
Append,
+      isMapGroupsWithState = false, null,
+      Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+    outputMode = Update)
+
+  // Aggregation not in Append mode followed by any stateful operators is 
disallowed
+  assertFailOnGlobalWatermarkLimit(
+    "mapGroupsWithState - mapGroupsWithState on streaming relation " +
+      "after aggregation in Update mode",
+    TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, 
Update,
+      isMapGroupsWithState = true, null,
+      Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+    outputMode = Update)
+
+  // FlatMapGroupsWithState followed by any stateful op not allowed, here test 
aggregation
+  assertFailOnGlobalWatermarkLimit(
+    "multiple stateful ops - FlatMapGroupsWithState followed by agg",
+    Aggregate(Nil, aggExprs("c"),
+      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
+        isMapGroupsWithState = false, GroupStateTimeout.EventTimeTimeout(), 
streamRelation)),
+    outputMode = Append)
+
+  // But allows if the FlatMapGroupsWithState has timeout on processing time
+  assertPassOnGlobalWatermarkLimit(
+    "multiple stateful ops - FlatMapGroupsWithState(process time) followed by 
agg",
+    Aggregate(Nil, aggExprs("c"),
+      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
+        isMapGroupsWithState = false, 
GroupStateTimeout.ProcessingTimeTimeout(), streamRelation)),
+    outputMode = Append)
+
+  // MapGroupsWithState followed by any stateful op not allowed, here test 
aggregation
+  assertFailOnGlobalWatermarkLimit(
+    "multiple stateful ops - MapGroupsWithState followed by agg",
+    Aggregate(Nil, aggExprs("c"),
+      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Update,
+        isMapGroupsWithState = true, GroupStateTimeout.EventTimeTimeout(), 
streamRelation)),
+    outputMode = Append)
+
+  // But allows if the MapGroupsWithState has timeout on processing time
+  assertPassOnGlobalWatermarkLimit(
+    "multiple stateful ops - MapGroupsWithState(process time) followed by agg",
+    Aggregate(Nil, aggExprs("c"),
+      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Update,
+        isMapGroupsWithState = true, 
GroupStateTimeout.ProcessingTimeTimeout(), streamRelation)),
+    outputMode = Append)
+
+  // stream-stream relation, time interval join can't be followed by any 
stateful operators
+  assertFailOnGlobalWatermarkLimit(
+    "multiple stateful ops - stream-stream time-interval join followed by agg",
+    Aggregate(Nil, aggExprs("c"),
+      streamRelation.join(streamRelation, joinType = Inner,
+        condition = Some(attribute === attribute &&
+          attributeWithWatermark > attributeWithWatermark + 10))),
+    outputMode = Append)
+
+  // stream-stream relation, only equality join can be followed by any 
stateful operators
+  assertPassOnGlobalWatermarkLimit(
+    "multiple stateful ops - stream-stream equality join followed by agg",
+    Aggregate(Nil, aggExprs("c"),
+      streamRelation.join(streamRelation, joinType = Inner,
+        condition = Some(attribute === attribute))),
+    outputMode = Append)
+
+  // Deduplication checks:
+  // Deduplication, if on event time column, is a stateful operator
+  // and cannot be placed after FlatMapGroupsWithState
+  assertFailOnGlobalWatermarkLimit(
+    "multiple stateful ops - FlatMapGroupsWithState followed by " +
+      "dedup (with event-time)",
+    Deduplicate(Seq(attributeWithWatermark),
+      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
+        isMapGroupsWithState = false, GroupStateTimeout.EventTimeTimeout(), 
streamRelation)),
+    outputMode = Append)
+
+  // Deduplication, if not on event time column,
+  // although it is still a stateful operator,
+  // it can be placed after FlatMapGroupsWithState
+  assertPassOnGlobalWatermarkLimit(
+    "multiple stateful ops - FlatMapGroupsWithState followed by " +
+      "dedup (without event-time)",
+    Deduplicate(Seq(att),
+      TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
+        isMapGroupsWithState = false, null, streamRelation)),
+    outputMode = Append)
+
+  // Deduplication, if on event time column, is a stateful operator
+  // and cannot be placed after aggregation
+  for (outputMode <- Seq(Update, Complete)) {
+    assertFailOnGlobalWatermarkLimit(
+      s"multiple stateful ops - aggregation($outputMode mode) followed by " +
+        "dedup (with event-time)",
+      Deduplicate(Seq(attributeWithWatermark),
+        Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+      outputMode = outputMode)
+
+    // Deduplication, if not on event time column,
+    // although it is still a stateful operator,
+    // it can be placed after aggregation
+    assertPassOnGlobalWatermarkLimit(
+      s"multiple stateful ops - aggregation($outputMode mode) followed by " +
+        "dedup (without event-time)",
+      Deduplicate(Seq(att),
+        Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+      outputMode = outputMode)
+  }
+
+  // Deduplication, if on event time column, is a stateful operator
+  // and cannot be placed after join
+  assertFailOnGlobalWatermarkLimit(
+    "multiple stateful ops - stream-stream time interval join followed by" +
+      "dedup (with event-time)",
+    Deduplicate(Seq(attributeWithWatermark),
+      streamRelation.join(streamRelation, joinType = Inner,
+        condition = Some(attribute === attribute &&
+          attributeWithWatermark > attributeWithWatermark + 10))),
+    outputMode = Append)
+
+  // Deduplication, if not on event time column,
+  // although it is still a stateful operator,
+  // it can be placed after join
+  assertPassOnGlobalWatermarkLimit(
+    "multiple stateful ops - stream-stream time interval join followed by" +
+      "dedup (without event-time)",
+    Deduplicate(Seq(att),
+      streamRelation.join(streamRelation, joinType = Inner,
+        condition = Some(attribute === attribute &&
+          attributeWithWatermark > attributeWithWatermark + 10))),
+    outputMode = Append)
+
+  // for a stream-stream join followed by a stateful operator,
+  // if the join is keyed on time-interval inequality conditions (inequality 
on watermarked cols),
+  // should fail.
+  // if the join is keyed on time-interval equality conditions -> should pass
+  Seq(Inner, LeftOuter, RightOuter, FullOuter).foreach {
+    joinType =>
+      assertFailOnGlobalWatermarkLimit(
+        s"streaming aggregation after " +
+          s"stream-stream $joinType join keyed on time inequality in Append 
mode are not supported",
+        streamRelation.join(streamRelation, joinType = joinType,
+          condition = Some(attributeWithWatermark === attribute &&
+            attributeWithWatermark < attributeWithWatermark + 10))
+          .groupBy("a")(count("*")),
+        outputMode = Append)
+
       assertPassOnGlobalWatermarkLimit(
         s"single $joinType join in Append mode",
         streamRelation.join(streamRelation, joinType = RightOuter,
           condition = Some(attributeWithWatermark === attribute)),
-        OutputMode.Append())
+        outputMode = Append)
 
-      testGlobalWatermarkLimit(
-        s"streaming aggregation after stream-stream $joinType join in Append 
mode",
+      assertPassOnGlobalWatermarkLimit(
+        s"streaming aggregation after " +
+          s"stream-stream $joinType join keyed on time equality in Append mode 
are supported",
         streamRelation.join(streamRelation, joinType = joinType,
           condition = Some(attributeWithWatermark === attribute))
           .groupBy("a")(count("*")),
-        OutputMode.Append(),
-        expectFailure = expectFailure)
+        outputMode = Append)
 
       Seq(Inner, LeftOuter, RightOuter).foreach { joinType2 =>
-        testGlobalWatermarkLimit(
+        assertPassOnGlobalWatermarkLimit(
           s"streaming-stream $joinType2 after stream-stream $joinType join in 
Append mode",
           streamRelation.join(
             streamRelation.join(streamRelation, joinType = joinType,
               condition = Some(attributeWithWatermark === attribute)),
             joinType = joinType2,
             condition = Some(attributeWithWatermark === attribute)),
-          OutputMode.Append(),
-          expectFailure = expectFailure)
+          outputMode = Append)
       }
 
-      testGlobalWatermarkLimit(
+      assertPassOnGlobalWatermarkLimit(
         s"FlatMapGroupsWithState after stream-stream $joinType join in Append 
mode",
         TestFlatMapGroupsWithState(
           null, att, att, Seq(att), Seq(att), att, null, Append,
           isMapGroupsWithState = false, null,
           streamRelation.join(streamRelation, joinType = joinType,
             condition = Some(attributeWithWatermark === attribute))),
-        OutputMode.Append(),
-        expectFailure = expectFailure)
+        outputMode = Append)
 
-      testGlobalWatermarkLimit(
+      assertPassOnGlobalWatermarkLimit(
         s"deduplicate after stream-stream $joinType join in Append mode",
         Deduplicate(Seq(attribute), streamRelation.join(streamRelation, 
joinType = joinType,
           condition = Some(attributeWithWatermark === attribute))),
-        OutputMode.Append(),
-        expectFailure = expectFailure)
+        outputMode = Append)
   }
 
   // Cogroup: only batch-batch is allowed
@@ -635,40 +767,36 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       null,
       null,
       null,
-      new TestStreamingRelationV2(attribute)), OutputMode.Append())
+      new TestStreamingRelationV2(attribute)), outputMode = Append)
 
   // streaming aggregation
   {
     assertPassOnGlobalWatermarkLimit(
       "single streaming aggregation in Append mode",
-      streamRelation.groupBy("a")(count("*")),
-      OutputMode.Append())
+      streamRelation.groupBy("a")(count("*")), outputMode = Append)
 
-    assertFailOnGlobalWatermarkLimit(
+    assertPassOnGlobalWatermarkLimit(
       "chained streaming aggregations in Append mode",
-      streamRelation.groupBy("a")(count("*")).groupBy()(count("*")),
-      OutputMode.Append())
+      streamRelation.groupBy("a")(count("*")).groupBy()(count("*")), 
outputMode = Append)
 
     Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
       val plan = streamRelation.join(streamRelation.groupBy("a")(count("*")), 
joinType = joinType)
-      assertFailOnGlobalWatermarkLimit(
+      assertPassOnGlobalWatermarkLimit(
         s"$joinType join after streaming aggregation in Append mode",
         streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType 
= joinType),
         OutputMode.Append())
     }
 
-    assertFailOnGlobalWatermarkLimit(
+    assertPassOnGlobalWatermarkLimit(
       "deduplicate after streaming aggregation in Append mode",
-      Deduplicate(Seq(attribute), streamRelation.groupBy("a")(count("*"))),
-      OutputMode.Append())
+      Deduplicate(Seq(attribute), streamRelation.groupBy("a")(count("*"))), 
OutputMode.Append())
 
-    assertFailOnGlobalWatermarkLimit(
+    assertPassOnGlobalWatermarkLimit(
       "FlatMapGroupsWithState after streaming aggregation in Append mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Append,
         isMapGroupsWithState = false, null,
-        streamRelation.groupBy("a")(count("*"))),
-      OutputMode.Append())
+        streamRelation.groupBy("a")(count("*"))), outputMode = Append)
   }
 
   // FlatMapGroupsWithState
@@ -677,24 +805,23 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       "single FlatMapGroupsWithState in Append mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Append,
-        isMapGroupsWithState = false, null, streamRelation),
-      OutputMode.Append())
+        isMapGroupsWithState = false, null, streamRelation), outputMode = 
Append)
 
     assertFailOnGlobalWatermarkLimit(
       "streaming aggregation after FlatMapGroupsWithState in Append mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Append,
-        isMapGroupsWithState = false, null, 
streamRelation).groupBy("*")(count("*")),
-      OutputMode.Append())
+        isMapGroupsWithState = false, GroupStateTimeout.EventTimeTimeout(),
+        streamRelation).groupBy("*")(count("*")), outputMode = Append)
 
     Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
       assertFailOnGlobalWatermarkLimit(
         s"stream-stream $joinType after FlatMapGroupsWithState in Append mode",
         streamRelation.join(
           TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
-          isMapGroupsWithState = false, null, streamRelation), joinType = 
joinType,
-          condition = Some(attributeWithWatermark === attribute)),
-        OutputMode.Append())
+          isMapGroupsWithState = false, GroupStateTimeout.EventTimeTimeout(),
+            streamRelation), joinType = joinType,
+          condition = Some(attributeWithWatermark === attribute)), outputMode 
= Append)
     }
 
     assertFailOnGlobalWatermarkLimit(
@@ -702,30 +829,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
         isMapGroupsWithState = false, null,
         TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
-          isMapGroupsWithState = false, null, streamRelation)),
-      OutputMode.Append())
+          isMapGroupsWithState = false, GroupStateTimeout.EventTimeTimeout(), 
streamRelation)),
+      outputMode = Append)
 
-    assertFailOnGlobalWatermarkLimit(
+    assertPassOnGlobalWatermarkLimit(
       s"deduplicate after FlatMapGroupsWithState in Append mode",
       Deduplicate(Seq(attribute),
         TestFlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, 
null, Append,
-          isMapGroupsWithState = false, null, streamRelation)),
-      OutputMode.Append())
+          isMapGroupsWithState = false, null, streamRelation)), outputMode = 
Append)
   }
 
   // deduplicate
   {
     assertPassOnGlobalWatermarkLimit(
       "streaming aggregation after deduplicate in Append mode",
-      Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")),
-      OutputMode.Append())
+      Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")), 
outputMode = Append)
 
     Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
       assertPassOnGlobalWatermarkLimit(
         s"$joinType join after deduplicate in Append mode",
         streamRelation.join(Deduplicate(Seq(attribute), streamRelation), 
joinType = joinType,
-          condition = Some(attributeWithWatermark === attribute)),
-        OutputMode.Append())
+          condition = Some(attributeWithWatermark === attribute)), outputMode 
= Append)
     }
 
     assertPassOnGlobalWatermarkLimit(
@@ -733,8 +857,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with 
SQLHelper {
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Append,
         isMapGroupsWithState = false, null,
-        Deduplicate(Seq(attribute), streamRelation)),
-      OutputMode.Append())
+        Deduplicate(Seq(attribute), streamRelation)), outputMode = Append)
   }
 
   /*
@@ -941,21 +1064,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       testNamePostfix: String,
       plan: LogicalPlan,
       outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= false)
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, 
outputMode)
   }
 
   def assertFailOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
       outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= true)
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = true, 
outputMode)
   }
 
   def testGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode,
-      expectFailure: Boolean): Unit = {
+      expectFailure: Boolean,
+      outputMode: OutputMode): Unit = {
     test(s"Global watermark limit - $testNamePostfix") {
       if (expectFailure) {
         withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> 
"true") {
@@ -966,10 +1089,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
           assert(e.message.contains("Detected pattern of possible 
'correctness' issue"))
         }
       } else {
-        withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> 
"false") {
-          UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
-            wrapInStreaming(plan), outputMode)
-        }
+        UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
+          wrapInStreaming(plan), outputMode)
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
index 0a3ea40a677..eb1e0de79ca 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.functions._
@@ -40,378 +40,427 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining 
of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, 
[25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, 
[25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) 
(1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in 
effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) 
(1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in 
effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 
65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), 
"inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
-
-      testStream(stream)(
-        MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
-
-        // op1 W (0, 0)
-        // join output: (1, 1), (2, 2), (3, 3), (4, 4)
-        // state: (1, 1), (2, 2), (3, 3), (4, 4)
-        // op2 W (0, 0)
-        // agg: [0, 5) 4
-        // output: None
-        // state: [0, 5) 4
-
-        // no-data batch triggered
-
-        // op1 W (0, 4)
-        // join output: None
-        // state: None
-        // op2 W (0, 4)
-        // agg: None
-        // output: None
-        // state: [0, 5) 4
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        // Move the watermark
-        MultiAddData(input1, 5)(input2, 5),
-
-        // op1 W (4, 4)
-        // join output: (5, 5)
-        // state: (5, 5)
-        // op2 W (4, 4)
-        // agg: [5, 10) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 1
-
-        // no-data batch triggered
-
-        // op1 W (4, 5)
-        // join output: None
-        // state: None
-        // op2 W (4, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF()
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF()
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), 
"inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("aggregation -> stream deduplication, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val aggStream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .withColumn("windowEnd", expr("window.end"))
-
-      // dropDuplicates from aggStream without event time column for 
dropDuplicates - the
-      // state does not get trimmed due to watermark advancement.
-      val dedupNoEventTime = aggStream
-        .dropDuplicates("count", "windowEnd")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupNoEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 1), (10, 1), (15, 1)
-        // state: (5, 1), (10, 1), (15, 1)
-
-        CheckNewAnswer((5, 1), (10, 1), (15, 1)),
-        assertNumStateRows(Seq(3, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for 
dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get 
trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
-      // Similar to the above but add event time. The dedup state will get 
trimmed.
-      val dedupWithEventTime = aggStream
-        .withColumn("windowTime", expr("window_time(window)"))
-        .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
-        .dropDuplicates("count", "windowEnd", "windowTime")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"windowTimeMicros".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupWithEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
-        // state: None - trimmed by watermark
-
-        CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF()
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    val e = intercept[AnalysisException] {
+      testStream(stream)(
+        StartStream()
       )
     }
+    assert(e.getMessage.contains("Detected pattern of possible 'correctness' 
issue"))
+  }
+
+  test("join with range join on non-time intervals -> window agg, append mode, 
shouldn't fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF()
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withColumn("v1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withColumn("end2", timestamp_seconds($"end"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("v1 >= start2 AND v1 < end2 " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(stream)(
+      AddData(input1, 1, 2, 3, 4),
+      AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)),
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   private def assertNumStateRows(numTotalRows: Seq[Long]): AssertOnQuery = 
AssertOnQuery { q =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to