This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d14346437b21 [SPARK-55471] Add optimizer support for
SequentialStreamingUnion
d14346437b21 is described below
commit d14346437b21c4ad21e808a89c180b497cf144b7
Author: ericm-db <[email protected]>
AuthorDate: Tue Feb 17 21:30:28 2026 +0800
[SPARK-55471] Add optimizer support for SequentialStreamingUnion
### What changes were proposed in this pull request?
This PR adds optimizer support for `SequentialStreamingUnion` to enable the
same query optimizations that regular `Union` nodes receive. Specifically:
1. **Validation refactoring**: Moved nesting validation from analysis phase
to post-optimization phase
- Removed `validateNoNesting` from `ValidateSequentialStreamingUnion`
(analysis rule)
- Added new `ValidateSequentialStreamingUnionNesting` rule that runs
after the optimizer's "Union" batch
- This allows the optimizer to flatten nested unions before validating
them
2. **Optimizer enhancements**:
- Extended `PushProjectionThroughUnion` to support
`SequentialStreamingUnion` by:
- Creating a generic `pushProjectionThroughUnionLike()` method that
works with any `UnionBase` subtype
- Adding pattern matching for `SequentialStreamingUnion` to push
projections down to children
- Extended `CombineUnions` to support `SequentialStreamingUnion` by:
- Adding `flattenSequentialStreamingUnion()` method (mirrors
`flattenUnion()`)
- Supporting flattening through `Distinct`, `Deduplicate`, and
`DeduplicateWithinWatermark`
- Enabling nested union flattening and projection pushdown through
wrapping operators
### Why are the changes needed?
`SequentialStreamingUnion` is a specialized union operator for streaming
queries that maintains source ordering and has specific constraints (no
stateful operations, streaming sources only). However, it was missing optimizer
support that regular `Union` nodes have.
Without these optimizations:
- Nested `SequentialStreamingUnion` nodes created through multiple
`followedBy()` calls would be rejected at analysis time, even though they could
be flattened
- Projections above `SequentialStreamingUnion` wouldn't be pushed down,
leading to unnecessary data processing
- Users couldn't compose complex streaming queries with the same
flexibility as batch queries
This PR enables better query optimization for streaming workloads using
`SequentialStreamingUnion`.
### Does this PR introduce _any_ user-facing change?
No. The changes are internal optimizer improvements that maintain the same
semantics. Users may see:
- Better performance due to projection pushdown and union flattening
- More flexible query patterns (e.g., projections over nested followedBy
calls) that now optimize correctly
The validation change is not user-facing because the nesting validation
still rejects nested unions, just at a later phase (after optimization attempts
flattening).
### How was this patch tested?
1. **Updated existing tests**: Modified
`SequentialStreamingUnionAnalysisSuite` to reflect that nesting validation now
runs post-optimization rather than during analysis
2. **Added new optimizer tests** in `SetOperationSuite`:
- `SequentialStreamingUnion: combine nested unions into one` - verifies
flattening of nested unions
- `SequentialStreamingUnion: flatten through Distinct` - verifies
flattening through Distinct operator
- `SequentialStreamingUnion: flatten through Deduplicate` - verifies
flattening through Deduplicate operator
- `SequentialStreamingUnion: project to each side` - verifies projection
pushdown to all children
- `SequentialStreamingUnion: expressions in project list are pushed
down` - verifies expression pushdown
All tests pass with the new optimizer rules in place.
### Was this patch authored or co-authored using generative AI tooling?
Claude Opus 4.6
Closes #54236 from ericm-db/sequential-union-optimizer-support.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../analysis/SequentialUnionAnalysis.scala | 21 ++-
.../spark/sql/catalyst/optimizer/Optimizer.scala | 79 ++++++-----
.../plans/logical/SequentialStreamingUnion.scala | 3 +
.../plans/logical/basicLogicalOperators.scala | 55 ++++++++
.../SequentialStreamingUnionAnalysisSuite.scala | 39 ------
.../sql/catalyst/optimizer/SetOperationSuite.scala | 153 +++++++++++++++++++++
6 files changed, 265 insertions(+), 85 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
index 3b0e3ede9cbc..c60d4f380256 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
@@ -23,8 +23,12 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.errors.QueryCompilationErrors
/**
- * Flattens nested SequentialStreamingUnion nodes into a single level.
+ * Flattens directly nested SequentialStreamingUnion nodes into a single level.
* This allows chaining: df1.followedBy(df2).followedBy(df3)
+ *
+ * Note: This only handles direct nesting where a SequentialStreamingUnion is
an immediate child.
+ * Nesting wrapped in other operators (e.g.,
Project(SequentialStreamingUnion(...))) is handled
+ * by the optimizer's CombineUnions rule.
*/
object FlattenSequentialStreamingUnion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
@@ -36,19 +40,20 @@ object FlattenSequentialStreamingUnion extends
Rule[LogicalPlan] {
}
/**
- * Validates SequentialStreamingUnion constraints:
+ * Validates SequentialStreamingUnion constraints during analysis:
* - All children must be streaming relations
- * - No nested SequentialStreamingUnions (should be flattened first)
* - No stateful operations in any child subtrees
*
* Note: Minimum 2 children is enforced by the resolved property, not explicit
validation.
+ * Note: Nested SequentialStreamingUnions are flattened by analysis
(FlattenSequentialStreamingUnion
+ * for direct nesting) and optimizer (CombineUnions for wrapped
nesting). Tests verify this
+ * flattening behavior.
*/
object ValidateSequentialStreamingUnion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
plan.foreach {
case su: SequentialStreamingUnion =>
validateAllStreaming(su)
- validateNoNesting(su)
validateNoStatefulDescendants(su)
case _ =>
}
@@ -62,14 +67,6 @@ object ValidateSequentialStreamingUnion extends
Rule[LogicalPlan] {
}
}
- private def validateNoNesting(su: SequentialStreamingUnion): Unit = {
- su.children.foreach { child =>
- if (child.containsPattern(SEQUENTIAL_STREAMING_UNION)) {
- throw QueryCompilationErrors.nestedSequentialStreamingUnionError()
- }
- }
- }
-
private def validateNoStatefulDescendants(su: SequentialStreamingUnion):
Unit = {
su.children.foreach { child =>
if (child.exists(UnsupportedOperationChecker.isStatefulOperation)) {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 125db2752b20..712d02ac8eca 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1018,7 +1018,9 @@ object PushProjectionThroughUnion extends
Rule[LogicalPlan] {
project.outputSet.size == project.projectList.size
}
- def pushProjectionThroughUnion(projectList: Seq[NamedExpression], u: Union):
Seq[LogicalPlan] = {
+ def pushProjectionThroughUnion(
+ projectList: Seq[NamedExpression],
+ u: UnionBase): Seq[LogicalPlan] = {
val newFirstChild = Project(projectList, u.children.head)
val newOtherChildren = u.children.tail.map { child =>
val rewrites = buildRewrites(u.children.head, child)
@@ -1028,13 +1030,15 @@ object PushProjectionThroughUnion extends
Rule[LogicalPlan] {
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
- _.containsAllPatterns(UNION, PROJECT)) {
+ t => t.containsPattern(PROJECT) &&
+ t.containsAnyPattern(UNION, SEQUENTIAL_STREAMING_UNION)) {
- // Push down deterministic projection through UNION ALL
- case project @ Project(projectList, u: Union)
+ // Push down deterministic projection through Union or
SequentialStreamingUnion.
+ // This is safe because it preserves child ordering.
+ case project @ Project(projectList, SequentialOrSimpleUnion(u))
if projectList.forall(_.deterministic) && u.children.nonEmpty &&
canPushProjectionThroughUnion(project) =>
- u.copy(children = pushProjectionThroughUnion(projectList, u))
+ u.withNewChildren(pushProjectionThroughUnion(projectList, u))
}
}
@@ -1814,20 +1818,30 @@ object CombineUnions extends Rule[LogicalPlan] {
import PushProjectionThroughUnion.{canPushProjectionThroughUnion,
pushProjectionThroughUnion}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformDownWithPruning(
- _.containsAnyPattern(UNION, DISTINCT_LIKE), ruleId) {
- case u: Union => flattenUnion(u, false)
- case Distinct(u: Union) => Distinct(flattenUnion(u, true))
+ _.containsAnyPattern(UNION, SEQUENTIAL_STREAMING_UNION, DISTINCT_LIKE),
ruleId) {
+ // Flatten Union or SequentialStreamingUnion.
+ // This is safe because flattening preserves child ordering.
+ case SequentialOrSimpleUnion(u) => flattenUnion(u, false)
+ case Distinct(SequentialOrSimpleUnion(u)) => Distinct(flattenUnion(u,
true))
// Only handle distinct-like 'Deduplicate', where the keys == output
- case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) ==
u.outputSet =>
+ case Deduplicate(keys: Seq[Attribute], SequentialOrSimpleUnion(u))
+ if AttributeSet(keys) == u.outputSet =>
Deduplicate(keys, flattenUnion(u, true))
- case DeduplicateWithinWatermark(keys: Seq[Attribute], u: Union)
+ case DeduplicateWithinWatermark(keys: Seq[Attribute],
SequentialOrSimpleUnion(u))
if AttributeSet(keys) == u.outputSet =>
DeduplicateWithinWatermark(keys, flattenUnion(u, true))
}
- private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
- val topByName = union.byName
- val topAllowMissingCol = union.allowMissingCol
+ private def flattenUnion(union: UnionBase, flattenDistinct: Boolean):
UnionBase = {
+ val topByName = SequentialOrSimpleUnion.byName(union)
+ val topAllowMissingCol = SequentialOrSimpleUnion.allowMissingCol(union)
+
+ // Helper to check if a union can be merged with the top-level union
+ def canMerge(u: UnionBase): Boolean = {
+ SequentialOrSimpleUnion.isSameType(union, u) &&
+ SequentialOrSimpleUnion.byName(u) == topByName &&
+ SequentialOrSimpleUnion.allowMissingCol(u) == topAllowMissingCol
+ }
val stack = mutable.Stack[LogicalPlan](union)
val flattened = mutable.ArrayBuffer.empty[LogicalPlan]
@@ -1843,38 +1857,35 @@ object CombineUnions extends Rule[LogicalPlan] {
!p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery)
=>
val newProjectList = buildCleanedProjectList(p1.projectList,
p2.projectList)
stack.pushAll(Seq(p2.copy(projectList = newProjectList)))
- case Distinct(Union(children, byName, allowMissingCol))
- if flattenDistinct && byName == topByName && allowMissingCol ==
topAllowMissingCol =>
- stack.pushAll(children.reverse)
+ case Distinct(SequentialOrSimpleUnion(u)) if flattenDistinct &&
canMerge(u) =>
+ stack.pushAll(u.children.reverse)
// Only handle distinct-like 'Deduplicate', where the keys == output
- case Deduplicate(keys: Seq[Attribute], u: Union)
- if flattenDistinct && u.byName == topByName &&
- u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) ==
u.outputSet =>
+ case Deduplicate(keys: Seq[Attribute], SequentialOrSimpleUnion(u))
+ if flattenDistinct && canMerge(u) && AttributeSet(keys) ==
u.outputSet =>
stack.pushAll(u.children.reverse)
- case Union(children, byName, allowMissingCol)
- if byName == topByName && allowMissingCol == topAllowMissingCol =>
- stack.pushAll(children.reverse)
- // Push down projection through Union and then push pushed plan to
Stack if
+ case SequentialOrSimpleUnion(u) if canMerge(u) =>
+ stack.pushAll(u.children.reverse)
+ // Push down projection through union and then push pushed plan to
Stack if
// there is a Project.
- case project @ Project(projectList, Distinct(u @ Union(children,
byName, allowMissingCol)))
- if projectList.forall(_.deterministic) && children.nonEmpty &&
- flattenDistinct && byName == topByName && allowMissingCol ==
topAllowMissingCol &&
+ case project @ Project(projectList,
Distinct(SequentialOrSimpleUnion(u)))
+ if projectList.forall(_.deterministic) && u.children.nonEmpty &&
+ flattenDistinct && canMerge(u) &&
canPushProjectionThroughUnion(project) =>
stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
- case project @ Project(projectList, Deduplicate(keys: Seq[Attribute],
u: Union))
- if projectList.forall(_.deterministic) && flattenDistinct &&
u.byName == topByName &&
- u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) ==
u.outputSet &&
- canPushProjectionThroughUnion(project) =>
+ case project @ Project(
+ projectList, Deduplicate(keys: Seq[Attribute],
SequentialOrSimpleUnion(u)))
+ if projectList.forall(_.deterministic) && flattenDistinct &&
canMerge(u) &&
+ AttributeSet(keys) == u.outputSet &&
canPushProjectionThroughUnion(project) =>
stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
- case project @ Project(projectList, u @ Union(children, byName,
allowMissingCol))
- if projectList.forall(_.deterministic) && children.nonEmpty &&
byName == topByName &&
- allowMissingCol == topAllowMissingCol &&
canPushProjectionThroughUnion(project) =>
+ case project @ Project(projectList, SequentialOrSimpleUnion(u))
+ if projectList.forall(_.deterministic) && u.children.nonEmpty &&
+ canMerge(u) && canPushProjectionThroughUnion(project) =>
stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
case child =>
flattened += child
}
}
- union.copy(children = flattened.toSeq)
+ SequentialOrSimpleUnion.withNewChildren(union, flattened.toSeq)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
index e2f1a1aed5d3..8917d6e8e61a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
@@ -35,6 +35,9 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
* 2. Second child begins processing
* 3. And so on...
*
+ * IMPORTANT: Child ordering IS semantically significant. Children are
processed sequentially
+ * in the exact order specified. Optimizer rules must preserve this ordering.
+ *
* Requirements:
* - Minimum 2 children required
* - All children must be streaming sources
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6b1b068234de..3fa7a4f4685e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -566,9 +566,64 @@ abstract class UnionBase extends LogicalPlan {
}
}
+/**
+ * Extractor and helper methods for Union and SequentialStreamingUnion.
+ * Does not match other UnionBase subtypes like UnionLoop.
+ */
+object SequentialOrSimpleUnion {
+ /**
+ * Extractor that matches Union and SequentialStreamingUnion for optimizer
rules.
+ */
+ def unapply(plan: LogicalPlan): Option[UnionBase] = plan match {
+ case u: Union => Some(u)
+ case u: SequentialStreamingUnion => Some(u)
+ case _ => None
+ }
+
+ /**
+ * Returns true if both unions are the same concrete type.
+ * Used during flattening to ensure Union and SequentialStreamingUnion are
not merged.
+ */
+ def isSameType(u1: UnionBase, u2: UnionBase): Boolean = (u1, u2) match {
+ case (_: Union, _: Union) => true
+ case (_: SequentialStreamingUnion, _: SequentialStreamingUnion) => true
+ case _ => false
+ }
+
+ /**
+ * Extracts byName flag from Union or SequentialStreamingUnion.
+ */
+ def byName(u: UnionBase): Boolean = u match {
+ case union: Union => union.byName
+ case ssu: SequentialStreamingUnion => ssu.byName
+ }
+
+ /**
+ * Extracts allowMissingCol flag from Union or SequentialStreamingUnion.
+ */
+ def allowMissingCol(u: UnionBase): Boolean = u match {
+ case union: Union => union.allowMissingCol
+ case ssu: SequentialStreamingUnion => ssu.allowMissingCol
+ }
+
+ /**
+ * Creates a new union of the same type with the specified children.
+ * This is needed when the number of children may change (e.g., flattening)
and we need
+ * to preserve the UnionBase return type rather than getting back
LogicalPlan.
+ */
+ def withNewChildren(u: UnionBase, newChildren: Seq[LogicalPlan]): UnionBase
= u match {
+ case union: Union => union.copy(children = newChildren)
+ case ssu: SequentialStreamingUnion => ssu.copy(children = newChildren)
+ }
+}
+
/**
* Logical plan for unioning multiple plans, without a distinct. This is UNION
ALL in SQL.
*
+ * NOTE: Child ordering is NOT semantically significant. Children are
processed in parallel
+ * and their order does not affect the result. This allows Union-specific
optimizations to
+ * reorder children (e.g., for performance), unlike SequentialStreamingUnion
where order matters.
+ *
* @param byName Whether resolves columns in the children by column
names.
* @param allowMissingCol Allows missing columns in children query plans. If
it is true,
* this function allows different set of column names
between two Datasets.
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
index 2cfe2d271043..56915fc0d86b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
@@ -136,45 +136,6 @@ class SequentialStreamingUnionAnalysisSuite extends
AnalysisTest with DataTypeEr
parameters = Map("operator" -> "SequentialStreamingUnion"))
}
- test("ValidateSequentialStreamingUnion - rejects directly nested unions") {
- val streamingRelation1 = testRelation1.copy(isStreaming = true)
- val streamingRelation2 = testRelation2.copy(isStreaming = true)
- val streamingRelation3 = testRelation3.copy(isStreaming = true)
-
- // Manually create a nested union without running flatten
- // (In practice, flatten would handle this, but validation catches it as a
safeguard)
- val innerUnion = SequentialStreamingUnion(streamingRelation1,
streamingRelation2)
- val outerUnion = SequentialStreamingUnion(innerUnion, streamingRelation3)
-
- checkError(
- exception = intercept[AnalysisException] {
- ValidateSequentialStreamingUnion(outerUnion)
- },
- condition = "NESTED_SEQUENTIAL_STREAMING_UNION",
- parameters = Map(
- "hint" -> "Use chained followedBy calls instead:
df1.followedBy(df2).followedBy(df3)"))
- }
-
- test("ValidateSequentialStreamingUnion - rejects nested unions through other
operators") {
- val streamingRelation1 = testRelation1.copy(isStreaming = true)
- val streamingRelation2 = testRelation2.copy(isStreaming = true)
- val streamingRelation3 = testRelation3.copy(isStreaming = true)
-
- // Create a nested union through a Project operator
- // e.g., from df1.select("a", "b").followedBy(df2)
- val innerUnion = SequentialStreamingUnion(streamingRelation1,
streamingRelation2)
- val projectOverUnion = Project(Seq($"a", $"b"), innerUnion)
- val outerUnion = SequentialStreamingUnion(projectOverUnion,
streamingRelation3)
-
- checkError(
- exception = intercept[AnalysisException] {
- ValidateSequentialStreamingUnion(outerUnion)
- },
- condition = "NESTED_SEQUENTIAL_STREAMING_UNION",
- parameters = Map(
- "hint" -> "Use chained followedBy calls instead:
df1.followedBy(df2).followedBy(df3)"))
- }
-
test("ValidateSequentialStreamingUnion - three or more children allowed") {
val streamingRelation1 = testRelation1.copy(isStreaming = true)
val streamingRelation2 = testRelation2.copy(isStreaming = true)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index d3aa1e0cd37c..7194bdf30fc2 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -413,4 +413,157 @@ class SetOperationSuite extends PlanTest {
optimizedRelation4, optimizedRelation5))).select($"a").analyze)
}
+
+ // SequentialStreamingUnion optimizer tests verify that CombineUnions
correctly flattens
+ // nested SequentialStreamingUnions and applies other union optimizations
while preserving
+ // child ordering. Flattening is an optimization, not a strict requirement -
these tests
+ // verify the optimizer's expected behavior.
+
+ test("SequentialStreamingUnion: combine nested unions into one") {
+ // Mark relations as streaming for SequentialStreamingUnion
+ val streamRel1 = testRelation.copy(isStreaming = true)
+ val streamRel2 = testRelation2.copy(isStreaming = true)
+ val streamRel3 = testRelation3.copy(isStreaming = true)
+
+ val seqUnion1 = SequentialStreamingUnion(
+ SequentialStreamingUnion(streamRel1, streamRel2), streamRel3)
+ val seqUnion2 = SequentialStreamingUnion(
+ streamRel1, SequentialStreamingUnion(streamRel2, streamRel3))
+ val optimized1 = Optimize.execute(seqUnion1.analyze)
+ val optimized2 = Optimize.execute(seqUnion2.analyze)
+
+ // Both should flatten to the same 3-child SequentialStreamingUnion
+ val expected = SequentialStreamingUnion(
+ Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol
= false)
+ comparePlans(optimized1, expected.analyze)
+ comparePlans(optimized2, expected.analyze)
+ }
+
+ test("SequentialStreamingUnion: flatten nested unions under Distinct") {
+ val streamRel1 = testRelation.copy(isStreaming = true)
+ val streamRel2 = testRelation2.copy(isStreaming = true)
+ val streamRel3 = testRelation3.copy(isStreaming = true)
+
+ val seqUnion = SequentialStreamingUnion(
+ SequentialStreamingUnion(streamRel1, streamRel2), streamRel3)
+ val distinctSeqUnion = Distinct(seqUnion)
+ val optimized = Optimize.execute(distinctSeqUnion.analyze)
+
+ val expected = Distinct(SequentialStreamingUnion(
+ Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol
= false))
+ comparePlans(optimized, expected.analyze)
+ }
+
+ test("SequentialStreamingUnion: flatten nested unions under Deduplicate") {
+ val streamRel1 = testRelation.copy(isStreaming = true)
+ val streamRel2 = testRelation2.copy(isStreaming = true)
+ val streamRel3 = testRelation3.copy(isStreaming = true)
+
+ val seqUnion = SequentialStreamingUnion(
+ SequentialStreamingUnion(streamRel1, streamRel2), streamRel3)
+ val deduped = seqUnion.deduplicate($"a", $"b", $"c")
+ val optimized = Optimize.execute(deduped.analyze)
+
+ val expected = Deduplicate(
+ Seq($"a", $"b", $"c"),
+ SequentialStreamingUnion(
+ Seq(streamRel1, streamRel2, streamRel3), byName = false,
allowMissingCol = false))
+ comparePlans(optimized, expected.analyze)
+ }
+
+ test("SequentialStreamingUnion: project to each side") {
+ val streamRel1 = testRelation.copy(isStreaming = true)
+ val streamRel2 = testRelation.copy(isStreaming = true)
+ val streamRel3 = testRelation.copy(isStreaming = true)
+
+ // Create union first, then project on top (like Union test at line 73)
+ val seqUnion = SequentialStreamingUnion(
+ Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol
= false)
+ val seqUnionQuery = seqUnion.select($"a")
+ val seqUnionOptimized = Optimize.execute(seqUnionQuery.analyze)
+
+ // Should push projection down to each child
+ val seqUnionCorrectAnswer =
+ SequentialStreamingUnion(
+ streamRel1.select($"a") ::
+ streamRel2.select($"a") ::
+ streamRel3.select($"a") :: Nil,
+ byName = false, allowMissingCol = false).analyze
+ comparePlans(seqUnionOptimized, seqUnionCorrectAnswer)
+ }
+
+ test("SequentialStreamingUnion: expressions in project list are pushed
down") {
+ val streamRel1 = testRelation.copy(isStreaming = true)
+ val streamRel2 = testRelation.copy(isStreaming = true)
+
+ val seqUnion = SequentialStreamingUnion(streamRel1, streamRel2)
+ val seqUnionQuery = seqUnion.select(($"a" + $"b").as("ab"))
+ val seqUnionOptimized = Optimize.execute(seqUnionQuery.analyze)
+
+ val seqUnionCorrectAnswer =
+ SequentialStreamingUnion(
+ streamRel1.select(($"a" + $"b").as("ab")) ::
+ streamRel2.select(($"a" + $"b").as("ab")) :: Nil,
+ byName = false, allowMissingCol = false).analyze
+ comparePlans(seqUnionOptimized, seqUnionCorrectAnswer)
+ }
+
+ test("SequentialStreamingUnion: do not merge with regular Union") {
+ // Use the same base relation but mark instances as streaming or
non-streaming
+ val streamRel1 = testRelation.copy(isStreaming = true)
+ val streamRel2 = testRelation.copy(isStreaming = true)
+ val batchRel1 = testRelation.copy(isStreaming = false)
+ val batchRel2 = testRelation.copy(isStreaming = false)
+
+ // Ensure SequentialStreamingUnion and regular Union don't flatten into
each other
+ val seqUnion = SequentialStreamingUnion(streamRel1, streamRel2)
+ val batchUnion = Union(batchRel1, batchRel2)
+
+ // Create a query with both types of unions - they should remain separate
+ val combined = seqUnion.select($"a").union(batchUnion.select($"a"))
+ val optimized = Optimize.execute(combined.analyze)
+
+ // The SequentialStreamingUnion should push projections to its children
+ // and stay as a SequentialStreamingUnion. The regular Union's children
+ // will be flattened into the outer Union, but importantly, the
+ // SequentialStreamingUnion children should NOT be merged with batch
children
+ val expectedSeqUnion = SequentialStreamingUnion(
+ streamRel1.select($"a") :: streamRel2.select($"a") :: Nil,
+ byName = false, allowMissingCol = false)
+ // The batch union gets flattened - its children become direct children of
outer Union
+ // The optimizer adds aliases to maintain proper attribute references
+ val expected = Union(
+ expectedSeqUnion ::
+ batchRel1.select($"a".as("a")) ::
+ batchRel2.select($"a".as("a")) :: Nil,
+ byName = false, allowMissingCol = false)
+
+ comparePlans(optimized, expected.analyze)
+ }
+
+ test("SequentialStreamingUnion: optimizer preserves child ordering") {
+ // Create distinct streaming relations with different outputs to verify
ordering
+ val streamRel1 = LocalRelation($"a".int, $"b".int).copy(isStreaming = true)
+ val streamRel2 = LocalRelation($"c".int, $"d".int).copy(isStreaming = true)
+ val streamRel3 = LocalRelation($"e".int, $"f".int).copy(isStreaming = true)
+
+ // Create SequentialStreamingUnion with a specific order
+ val seqUnion = SequentialStreamingUnion(
+ Seq(streamRel1, streamRel2, streamRel3), byName = false, allowMissingCol
= false)
+
+ // Apply various optimizer rules by running full optimization
+ val optimized = Optimize.execute(seqUnion.analyze)
+
+ // Extract the children from the optimized plan
+ val optimizedUnion = optimized.asInstanceOf[SequentialStreamingUnion]
+
+ // Verify the children are in the exact same order
+ assert(optimizedUnion.children.length == 3, "Should have 3 children")
+ assert(optimizedUnion.children(0).output.map(_.name) == Seq("a", "b"),
+ "First child should be streamRel1 with columns a, b")
+ assert(optimizedUnion.children(1).output.map(_.name) == Seq("c", "d"),
+ "Second child should be streamRel2 with columns c, d")
+ assert(optimizedUnion.children(2).output.map(_.name) == Seq("e", "f"),
+ "Third child should be streamRel3 with columns e, f")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]