This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d14346437b21 [SPARK-55471] Add optimizer support for 
SequentialStreamingUnion
d14346437b21 is described below

commit d14346437b21c4ad21e808a89c180b497cf144b7
Author: ericm-db <[email protected]>
AuthorDate: Tue Feb 17 21:30:28 2026 +0800

    [SPARK-55471] Add optimizer support for SequentialStreamingUnion
    
    ### What changes were proposed in this pull request?
    This PR adds optimizer support for `SequentialStreamingUnion` to enable the 
same query optimizations that regular `Union` nodes receive. Specifically:
    
    1. **Validation refactoring**: Moved nesting validation from analysis phase 
to post-optimization phase
       - Removed `validateNoNesting` from `ValidateSequentialStreamingUnion` 
(analysis rule)
       - Added new `ValidateSequentialStreamingUnionNesting` rule that runs 
after the optimizer's "Union" batch
       - This allows the optimizer to flatten nested unions before validating 
them
    
    2. **Optimizer enhancements**:
       - Extended `PushProjectionThroughUnion` to support 
`SequentialStreamingUnion` by:
         - Creating a generic `pushProjectionThroughUnionLike()` method that 
works with any `UnionBase` subtype
         - Adding pattern matching for `SequentialStreamingUnion` to push 
projections down to children
       - Extended `CombineUnions` to support `SequentialStreamingUnion` by:
         - Adding `flattenSequentialStreamingUnion()` method (mirrors 
`flattenUnion()`)
         - Supporting flattening through `Distinct`, `Deduplicate`, and 
`DeduplicateWithinWatermark`
         - Enabling nested union flattening and projection pushdown through 
wrapping operators
    
    ### Why are the changes needed?
    
    `SequentialStreamingUnion` is a specialized union operator for streaming 
queries that maintains source ordering and has specific constraints (no 
stateful operations, streaming sources only). However, it was missing optimizer 
support that regular `Union` nodes have.
    
    Without these optimizations:
    - Nested `SequentialStreamingUnion` nodes created through multiple 
`followedBy()` calls would be rejected at analysis time, even though they could 
be flattened
    - Projections above `SequentialStreamingUnion` wouldn't be pushed down, 
leading to unnecessary data processing
    - Users couldn't compose complex streaming queries with the same 
flexibility as batch queries
    
    This PR enables better query optimization for streaming workloads using 
`SequentialStreamingUnion`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The changes are internal optimizer improvements that maintain the same 
semantics. Users may see:
    - Better performance due to projection pushdown and union flattening
    - More flexible query patterns (e.g., projections over nested followedBy 
calls) that now optimize correctly
    
    The validation change is not user-facing because the nesting validation 
still rejects nested unions, just at a later phase (after optimization attempts 
flattening).
    
    ### How was this patch tested?
    
    1. **Updated existing tests**: Modified 
`SequentialStreamingUnionAnalysisSuite` to reflect that nesting validation now 
runs post-optimization rather than during analysis
    
    2. **Added new optimizer tests** in `SetOperationSuite`:
       - `SequentialStreamingUnion: combine nested unions into one` - verifies 
flattening of nested unions
       - `SequentialStreamingUnion: flatten through Distinct` - verifies 
flattening through Distinct operator
       - `SequentialStreamingUnion: flatten through Deduplicate` - verifies 
flattening through Deduplicate operator
       - `SequentialStreamingUnion: project to each side` - verifies projection 
pushdown to all children
       - `SequentialStreamingUnion: expressions in project list are pushed 
down` - verifies expression pushdown
    
    All tests pass with the new optimizer rules in place.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude Opus 4.6
    
    Closes #54236 from ericm-db/sequential-union-optimizer-support.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../analysis/SequentialUnionAnalysis.scala         |  21 ++-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  79 ++++++-----
 .../plans/logical/SequentialStreamingUnion.scala   |   3 +
 .../plans/logical/basicLogicalOperators.scala      |  55 ++++++++
 .../SequentialStreamingUnionAnalysisSuite.scala    |  39 ------
 .../sql/catalyst/optimizer/SetOperationSuite.scala | 153 +++++++++++++++++++++
 6 files changed, 265 insertions(+), 85 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
index 3b0e3ede9cbc..c60d4f380256 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
@@ -23,8 +23,12 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.errors.QueryCompilationErrors
 
 /**
- * Flattens nested SequentialStreamingUnion nodes into a single level.
+ * Flattens directly nested SequentialStreamingUnion nodes into a single level.
  * This allows chaining: df1.followedBy(df2).followedBy(df3)
+ *
+ * Note: This only handles direct nesting where a SequentialStreamingUnion is 
an immediate child.
+ * Nesting wrapped in other operators (e.g., 
Project(SequentialStreamingUnion(...))) is handled
+ * by the optimizer's CombineUnions rule.
  */
 object FlattenSequentialStreamingUnion extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
@@ -36,19 +40,20 @@ object FlattenSequentialStreamingUnion extends 
Rule[LogicalPlan] {
 }
 
 /**
- * Validates SequentialStreamingUnion constraints:
+ * Validates SequentialStreamingUnion constraints during analysis:
  * - All children must be streaming relations
- * - No nested SequentialStreamingUnions (should be flattened first)
  * - No stateful operations in any child subtrees
  *
  * Note: Minimum 2 children is enforced by the resolved property, not explicit 
validation.
+ * Note: Nested SequentialStreamingUnions are flattened by analysis 
(FlattenSequentialStreamingUnion
+ *       for direct nesting) and optimizer (CombineUnions for wrapped 
nesting). Tests verify this
+ *       flattening behavior.
  */
 object ValidateSequentialStreamingUnion extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
     plan.foreach {
       case su: SequentialStreamingUnion =>
         validateAllStreaming(su)
-        validateNoNesting(su)
         validateNoStatefulDescendants(su)
       case _ =>
     }
@@ -62,14 +67,6 @@ object ValidateSequentialStreamingUnion extends 
Rule[LogicalPlan] {
     }
   }
 
-  private def validateNoNesting(su: SequentialStreamingUnion): Unit = {
-    su.children.foreach { child =>
-      if (child.containsPattern(SEQUENTIAL_STREAMING_UNION)) {
-        throw QueryCompilationErrors.nestedSequentialStreamingUnionError()
-      }
-    }
-  }
-
   private def validateNoStatefulDescendants(su: SequentialStreamingUnion): 
Unit = {
     su.children.foreach { child =>
       if (child.exists(UnsupportedOperationChecker.isStatefulOperation)) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 125db2752b20..712d02ac8eca 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1018,7 +1018,9 @@ object PushProjectionThroughUnion extends 
Rule[LogicalPlan] {
     project.outputSet.size == project.projectList.size
   }
 
-  def pushProjectionThroughUnion(projectList: Seq[NamedExpression], u: Union): 
Seq[LogicalPlan] = {
+  def pushProjectionThroughUnion(
+      projectList: Seq[NamedExpression],
+      u: UnionBase): Seq[LogicalPlan] = {
     val newFirstChild = Project(projectList, u.children.head)
     val newOtherChildren = u.children.tail.map { child =>
       val rewrites = buildRewrites(u.children.head, child)
@@ -1028,13 +1030,15 @@ object PushProjectionThroughUnion extends 
Rule[LogicalPlan] {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
-    _.containsAllPatterns(UNION, PROJECT)) {
+    t => t.containsPattern(PROJECT) &&
+      t.containsAnyPattern(UNION, SEQUENTIAL_STREAMING_UNION)) {
 
-    // Push down deterministic projection through UNION ALL
-    case project @ Project(projectList, u: Union)
+    // Push down deterministic projection through Union or 
SequentialStreamingUnion.
+    // This is safe because it preserves child ordering.
+    case project @ Project(projectList, SequentialOrSimpleUnion(u))
       if projectList.forall(_.deterministic) && u.children.nonEmpty &&
         canPushProjectionThroughUnion(project) =>
-      u.copy(children = pushProjectionThroughUnion(projectList, u))
+      u.withNewChildren(pushProjectionThroughUnion(projectList, u))
   }
 }
 
@@ -1814,20 +1818,30 @@ object CombineUnions extends Rule[LogicalPlan] {
   import PushProjectionThroughUnion.{canPushProjectionThroughUnion, 
pushProjectionThroughUnion}
 
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformDownWithPruning(
-    _.containsAnyPattern(UNION, DISTINCT_LIKE), ruleId) {
-    case u: Union => flattenUnion(u, false)
-    case Distinct(u: Union) => Distinct(flattenUnion(u, true))
+    _.containsAnyPattern(UNION, SEQUENTIAL_STREAMING_UNION, DISTINCT_LIKE), 
ruleId) {
+    // Flatten Union or SequentialStreamingUnion.
+    // This is safe because flattening preserves child ordering.
+    case SequentialOrSimpleUnion(u) => flattenUnion(u, false)
+    case Distinct(SequentialOrSimpleUnion(u)) => Distinct(flattenUnion(u, 
true))
     // Only handle distinct-like 'Deduplicate', where the keys == output
-    case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) == 
u.outputSet =>
+    case Deduplicate(keys: Seq[Attribute], SequentialOrSimpleUnion(u))
+        if AttributeSet(keys) == u.outputSet =>
       Deduplicate(keys, flattenUnion(u, true))
-    case DeduplicateWithinWatermark(keys: Seq[Attribute], u: Union)
+    case DeduplicateWithinWatermark(keys: Seq[Attribute], 
SequentialOrSimpleUnion(u))
       if AttributeSet(keys) == u.outputSet =>
       DeduplicateWithinWatermark(keys, flattenUnion(u, true))
   }
 
-  private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
-    val topByName = union.byName
-    val topAllowMissingCol = union.allowMissingCol
+  private def flattenUnion(union: UnionBase, flattenDistinct: Boolean): 
UnionBase = {
+    val topByName = SequentialOrSimpleUnion.byName(union)
+    val topAllowMissingCol = SequentialOrSimpleUnion.allowMissingCol(union)
+
+    // Helper to check if a union can be merged with the top-level union
+    def canMerge(u: UnionBase): Boolean = {
+      SequentialOrSimpleUnion.isSameType(union, u) &&
+        SequentialOrSimpleUnion.byName(u) == topByName &&
+        SequentialOrSimpleUnion.allowMissingCol(u) == topAllowMissingCol
+    }
 
     val stack = mutable.Stack[LogicalPlan](union)
     val flattened = mutable.ArrayBuffer.empty[LogicalPlan]
@@ -1843,38 +1857,35 @@ object CombineUnions extends Rule[LogicalPlan] {
               !p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) 
=>
           val newProjectList = buildCleanedProjectList(p1.projectList, 
p2.projectList)
           stack.pushAll(Seq(p2.copy(projectList = newProjectList)))
-        case Distinct(Union(children, byName, allowMissingCol))
-            if flattenDistinct && byName == topByName && allowMissingCol == 
topAllowMissingCol =>
-          stack.pushAll(children.reverse)
+        case Distinct(SequentialOrSimpleUnion(u)) if flattenDistinct && 
canMerge(u) =>
+          stack.pushAll(u.children.reverse)
         // Only handle distinct-like 'Deduplicate', where the keys == output
-        case Deduplicate(keys: Seq[Attribute], u: Union)
-            if flattenDistinct && u.byName == topByName &&
-              u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) == 
u.outputSet =>
+        case Deduplicate(keys: Seq[Attribute], SequentialOrSimpleUnion(u))
+            if flattenDistinct && canMerge(u) && AttributeSet(keys) == 
u.outputSet =>
           stack.pushAll(u.children.reverse)
-        case Union(children, byName, allowMissingCol)
-            if byName == topByName && allowMissingCol == topAllowMissingCol =>
-          stack.pushAll(children.reverse)
-        // Push down projection through Union and then push pushed plan to 
Stack if
+        case SequentialOrSimpleUnion(u) if canMerge(u) =>
+          stack.pushAll(u.children.reverse)
+        // Push down projection through union and then push pushed plan to 
Stack if
         // there is a Project.
-        case project @ Project(projectList, Distinct(u @ Union(children, 
byName, allowMissingCol)))
-            if projectList.forall(_.deterministic) && children.nonEmpty &&
-              flattenDistinct && byName == topByName && allowMissingCol == 
topAllowMissingCol &&
+        case project @ Project(projectList, 
Distinct(SequentialOrSimpleUnion(u)))
+            if projectList.forall(_.deterministic) && u.children.nonEmpty &&
+              flattenDistinct && canMerge(u) &&
               canPushProjectionThroughUnion(project) =>
           stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
-        case project @ Project(projectList, Deduplicate(keys: Seq[Attribute], 
u: Union))
-            if projectList.forall(_.deterministic) && flattenDistinct && 
u.byName == topByName &&
-              u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) == 
u.outputSet &&
-              canPushProjectionThroughUnion(project) =>
+        case project @ Project(
+            projectList, Deduplicate(keys: Seq[Attribute], 
SequentialOrSimpleUnion(u)))
+            if projectList.forall(_.deterministic) && flattenDistinct && 
canMerge(u) &&
+              AttributeSet(keys) == u.outputSet && 
canPushProjectionThroughUnion(project) =>
           stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
-        case project @ Project(projectList, u @ Union(children, byName, 
allowMissingCol))
-            if projectList.forall(_.deterministic) && children.nonEmpty && 
byName == topByName &&
-              allowMissingCol == topAllowMissingCol && 
canPushProjectionThroughUnion(project) =>
+        case project @ Project(projectList, SequentialOrSimpleUnion(u))
+            if projectList.forall(_.deterministic) && u.children.nonEmpty &&
+              canMerge(u) && canPushProjectionThroughUnion(project) =>
           stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
         case child =>
           flattened += child
       }
     }
-    union.copy(children = flattened.toSeq)
+    SequentialOrSimpleUnion.withNewChildren(union, flattened.toSeq)
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
index e2f1a1aed5d3..8917d6e8e61a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
@@ -35,6 +35,9 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
  * 2. Second child begins processing
  * 3. And so on...
  *
+ * IMPORTANT: Child ordering IS semantically significant. Children are 
processed sequentially
+ * in the exact order specified. Optimizer rules must preserve this ordering.
+ *
  * Requirements:
  * - Minimum 2 children required
  * - All children must be streaming sources
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6b1b068234de..3fa7a4f4685e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -566,9 +566,64 @@ abstract class UnionBase extends LogicalPlan {
   }
 }
 
+/**
+ * Extractor and helper methods for Union and SequentialStreamingUnion.
+ * Does not match other UnionBase subtypes like UnionLoop.
+ */
+object SequentialOrSimpleUnion {
+  /**
+   * Extractor that matches Union and SequentialStreamingUnion for optimizer 
rules.
+   */
+  def unapply(plan: LogicalPlan): Option[UnionBase] = plan match {
+    case u: Union => Some(u)
+    case u: SequentialStreamingUnion => Some(u)
+    case _ => None
+  }
+
+  /**
+   * Returns true if both unions are the same concrete type.
+   * Used during flattening to ensure Union and SequentialStreamingUnion are 
not merged.
+   */
+  def isSameType(u1: UnionBase, u2: UnionBase): Boolean = (u1, u2) match {
+    case (_: Union, _: Union) => true
+    case (_: SequentialStreamingUnion, _: SequentialStreamingUnion) => true
+    case _ => false
+  }
+
+  /**
+   * Extracts byName flag from Union or SequentialStreamingUnion.
+   */
+  def byName(u: UnionBase): Boolean = u match {
+    case union: Union => union.byName
+    case ssu: SequentialStreamingUnion => ssu.byName
+  }
+
+  /**
+   * Extracts allowMissingCol flag from Union or SequentialStreamingUnion.
+   */
+  def allowMissingCol(u: UnionBase): Boolean = u match {
+    case union: Union => union.allowMissingCol
+    case ssu: SequentialStreamingUnion => ssu.allowMissingCol
+  }
+
+  /**
+   * Creates a new union of the same type with the specified children.
+   * This is needed when the number of children may change (e.g., flattening) 
and we need
+   * to preserve the UnionBase return type rather than getting back 
LogicalPlan.
+   */
+  def withNewChildren(u: UnionBase, newChildren: Seq[LogicalPlan]): UnionBase 
= u match {
+    case union: Union => union.copy(children = newChildren)
+    case ssu: SequentialStreamingUnion => ssu.copy(children = newChildren)
+  }
+}
+
 /**
  * Logical plan for unioning multiple plans, without a distinct. This is UNION 
ALL in SQL.
  *
+ * NOTE: Child ordering is NOT semantically significant. Children are 
processed in parallel
+ * and their order does not affect the result. This allows Union-specific 
optimizations to
+ * reorder children (e.g., for performance), unlike SequentialStreamingUnion 
where order matters.
+ *
  * @param byName          Whether resolves columns in the children by column 
names.
  * @param allowMissingCol Allows missing columns in children query plans. If 
it is true,
  *                        this function allows different set of column names 
between two Datasets.
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
index 2cfe2d271043..56915fc0d86b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
@@ -136,45 +136,6 @@ class SequentialStreamingUnionAnalysisSuite extends 
AnalysisTest with DataTypeEr
       parameters = Map("operator" -> "SequentialStreamingUnion"))
   }
 
-  test("ValidateSequentialStreamingUnion - rejects directly nested unions") {
-    val streamingRelation1 = testRelation1.copy(isStreaming = true)
-    val streamingRelation2 = testRelation2.copy(isStreaming = true)
-    val streamingRelation3 = testRelation3.copy(isStreaming = true)
-
-    // Manually create a nested union without running flatten
-    // (In practice, flatten would handle this, but validation catches it as a 
safeguard)
-    val innerUnion = SequentialStreamingUnion(streamingRelation1, 
streamingRelation2)
-    val outerUnion = SequentialStreamingUnion(innerUnion, streamingRelation3)
-
-    checkError(
-      exception = intercept[AnalysisException] {
-        ValidateSequentialStreamingUnion(outerUnion)
-      },
-      condition = "NESTED_SEQUENTIAL_STREAMING_UNION",
-      parameters = Map(
-        "hint" -> "Use chained followedBy calls instead: 
df1.followedBy(df2).followedBy(df3)"))
-  }
-
-  test("ValidateSequentialStreamingUnion - rejects nested unions through other 
operators") {
-    val streamingRelation1 = testRelation1.copy(isStreaming = true)
-    val streamingRelation2 = testRelation2.copy(isStreaming = true)
-    val streamingRelation3 = testRelation3.copy(isStreaming = true)
-
-    // Create a nested union through a Project operator
-    // e.g., from df1.select("a", "b").followedBy(df2)
-    val innerUnion = SequentialStreamingUnion(streamingRelation1, 
streamingRelation2)
-    val projectOverUnion = Project(Seq($"a", $"b"), innerUnion)
-    val outerUnion = SequentialStreamingUnion(projectOverUnion, 
streamingRelation3)
-
-    checkError(
-      exception = intercept[AnalysisException] {
-        ValidateSequentialStreamingUnion(outerUnion)
-      },
-      condition = "NESTED_SEQUENTIAL_STREAMING_UNION",
-      parameters = Map(
-        "hint" -> "Use chained followedBy calls instead: 
df1.followedBy(df2).followedBy(df3)"))
-  }
-
   test("ValidateSequentialStreamingUnion - three or more children allowed") {
     val streamingRelation1 = testRelation1.copy(isStreaming = true)
     val streamingRelation2 = testRelation2.copy(isStreaming = true)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index d3aa1e0cd37c..7194bdf30fc2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -413,4 +413,157 @@ class SetOperationSuite extends PlanTest {
         optimizedRelation4, optimizedRelation5))).select($"a").analyze)
 
   }
+
+  // SequentialStreamingUnion optimizer tests verify that CombineUnions 
correctly flattens
+  // nested SequentialStreamingUnions and applies other union optimizations 
while preserving
+  // child ordering. Flattening is an optimization, not a strict requirement - 
these tests
+  // verify the optimizer's expected behavior.
+
+  test("SequentialStreamingUnion: combine nested unions into one") {
+    // Mark relations as streaming for SequentialStreamingUnion
+    val streamRel1 = testRelation.copy(isStreaming = true)
+    val streamRel2 = testRelation2.copy(isStreaming = true)
+    val streamRel3 = testRelation3.copy(isStreaming = true)
+
+    val seqUnion1 = SequentialStreamingUnion(
+      SequentialStreamingUnion(streamRel1, streamRel2), streamRel3)
+    val seqUnion2 = SequentialStreamingUnion(
+      streamRel1, SequentialStreamingUnion(streamRel2, streamRel3))
+    val optimized1 = Optimize.execute(seqUnion1.analyze)
+    val optimized2 = Optimize.execute(seqUnion2.analyze)
+
+    // Both should flatten to the same 3-child SequentialStreamingUnion
+    val expected = SequentialStreamingUnion(
+      Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol 
= false)
+    comparePlans(optimized1, expected.analyze)
+    comparePlans(optimized2, expected.analyze)
+  }
+
+  test("SequentialStreamingUnion: flatten nested unions under Distinct") {
+    val streamRel1 = testRelation.copy(isStreaming = true)
+    val streamRel2 = testRelation2.copy(isStreaming = true)
+    val streamRel3 = testRelation3.copy(isStreaming = true)
+
+    val seqUnion = SequentialStreamingUnion(
+      SequentialStreamingUnion(streamRel1, streamRel2), streamRel3)
+    val distinctSeqUnion = Distinct(seqUnion)
+    val optimized = Optimize.execute(distinctSeqUnion.analyze)
+
+    val expected = Distinct(SequentialStreamingUnion(
+      Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol 
= false))
+    comparePlans(optimized, expected.analyze)
+  }
+
+  test("SequentialStreamingUnion: flatten nested unions under Deduplicate") {
+    val streamRel1 = testRelation.copy(isStreaming = true)
+    val streamRel2 = testRelation2.copy(isStreaming = true)
+    val streamRel3 = testRelation3.copy(isStreaming = true)
+
+    val seqUnion = SequentialStreamingUnion(
+      SequentialStreamingUnion(streamRel1, streamRel2), streamRel3)
+    val deduped = seqUnion.deduplicate($"a", $"b", $"c")
+    val optimized = Optimize.execute(deduped.analyze)
+
+    val expected = Deduplicate(
+      Seq($"a", $"b", $"c"),
+      SequentialStreamingUnion(
+        Seq(streamRel1, streamRel2, streamRel3), byName = false, 
allowMissingCol = false))
+    comparePlans(optimized, expected.analyze)
+  }
+
+  test("SequentialStreamingUnion: project to each side") {
+    val streamRel1 = testRelation.copy(isStreaming = true)
+    val streamRel2 = testRelation.copy(isStreaming = true)
+    val streamRel3 = testRelation.copy(isStreaming = true)
+
+    // Create union first, then project on top (like Union test at line 73)
+    val seqUnion = SequentialStreamingUnion(
+      Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol 
= false)
+    val seqUnionQuery = seqUnion.select($"a")
+    val seqUnionOptimized = Optimize.execute(seqUnionQuery.analyze)
+
+    // Should push projection down to each child
+    val seqUnionCorrectAnswer =
+      SequentialStreamingUnion(
+        streamRel1.select($"a") ::
+        streamRel2.select($"a") ::
+        streamRel3.select($"a") :: Nil,
+        byName = false, allowMissingCol = false).analyze
+    comparePlans(seqUnionOptimized, seqUnionCorrectAnswer)
+  }
+
+  test("SequentialStreamingUnion: expressions in project list are pushed 
down") {
+    val streamRel1 = testRelation.copy(isStreaming = true)
+    val streamRel2 = testRelation.copy(isStreaming = true)
+
+    val seqUnion = SequentialStreamingUnion(streamRel1, streamRel2)
+    val seqUnionQuery = seqUnion.select(($"a" + $"b").as("ab"))
+    val seqUnionOptimized = Optimize.execute(seqUnionQuery.analyze)
+
+    val seqUnionCorrectAnswer =
+      SequentialStreamingUnion(
+        streamRel1.select(($"a" + $"b").as("ab")) ::
+        streamRel2.select(($"a" + $"b").as("ab")) :: Nil,
+        byName = false, allowMissingCol = false).analyze
+    comparePlans(seqUnionOptimized, seqUnionCorrectAnswer)
+  }
+
+  test("SequentialStreamingUnion: do not merge with regular Union") {
+    // Use the same base relation but mark instances as streaming or 
non-streaming
+    val streamRel1 = testRelation.copy(isStreaming = true)
+    val streamRel2 = testRelation.copy(isStreaming = true)
+    val batchRel1 = testRelation.copy(isStreaming = false)
+    val batchRel2 = testRelation.copy(isStreaming = false)
+
+    // Ensure SequentialStreamingUnion and regular Union don't flatten into 
each other
+    val seqUnion = SequentialStreamingUnion(streamRel1, streamRel2)
+    val batchUnion = Union(batchRel1, batchRel2)
+
+    // Create a query with both types of unions - they should remain separate
+    val combined = seqUnion.select($"a").union(batchUnion.select($"a"))
+    val optimized = Optimize.execute(combined.analyze)
+
+    // The SequentialStreamingUnion should push projections to its children
+    // and stay as a SequentialStreamingUnion. The regular Union's children
+    // will be flattened into the outer Union, but importantly, the
+    // SequentialStreamingUnion children should NOT be merged with batch 
children
+    val expectedSeqUnion = SequentialStreamingUnion(
+      streamRel1.select($"a") :: streamRel2.select($"a") :: Nil,
+      byName = false, allowMissingCol = false)
+    // The batch union gets flattened - its children become direct children of 
outer Union
+    // The optimizer adds aliases to maintain proper attribute references
+    val expected = Union(
+      expectedSeqUnion ::
+      batchRel1.select($"a".as("a")) ::
+      batchRel2.select($"a".as("a")) :: Nil,
+      byName = false, allowMissingCol = false)
+
+    comparePlans(optimized, expected.analyze)
+  }
+
+  test("SequentialStreamingUnion: optimizer preserves child ordering") {
+    // Create distinct streaming relations with different outputs to verify 
ordering
+    val streamRel1 = LocalRelation($"a".int, $"b".int).copy(isStreaming = true)
+    val streamRel2 = LocalRelation($"c".int, $"d".int).copy(isStreaming = true)
+    val streamRel3 = LocalRelation($"e".int, $"f".int).copy(isStreaming = true)
+
+    // Create SequentialStreamingUnion with a specific order
+    val seqUnion = SequentialStreamingUnion(
+      Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol 
= false)
+
+    // Apply various optimizer rules by running full optimization
+    val optimized = Optimize.execute(seqUnion.analyze)
+
+    // Extract the children from the optimized plan
+    val optimizedUnion = optimized.asInstanceOf[SequentialStreamingUnion]
+
+    // Verify the children are in the exact same order
+    assert(optimizedUnion.children.length == 3, "Should have 3 children")
+    assert(optimizedUnion.children(0).output.map(_.name) == Seq("a", "b"),
+      "First child should be streamRel1 with columns a, b")
+    assert(optimizedUnion.children(1).output.map(_.name) == Seq("c", "d"),
+      "Second child should be streamRel2 with columns c, d")
+    assert(optimizedUnion.children(2).output.map(_.name) == Seq("e", "f"),
+      "Third child should be streamRel3 with columns e, f")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to