This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 7e5d59289870 [SPARK-47305][SQL] Fix PruneFilters to tag the
isStreaming flag of LocalRelation correctly when the plan has both batch and
streaming
7e5d59289870 is described below
commit 7e5d5928987069e255da94f8dd8b0cd7696a773b
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Mar 7 15:11:15 2024 +0900
[SPARK-47305][SQL] Fix PruneFilters to tag the isStreaming flag of
LocalRelation correctly when the plan has both batch and streaming
### What changes were proposed in this pull request?
This PR proposes to fix PruneFilters to tag the isStreaming flag of
LocalRelation correctly when the plan has both batch and streaming.
### Why are the changes needed?
When filter is evaluated to be always false, PruneFilters replaces the
filter with empty LocalRelation, which effectively prunes filter. The logic
cares about migration of the isStreaming flag, but incorrectly migrated in some
case, via picking up the value of isStreaming flag from root node rather than
filter (or child).
isStreaming flag is true if the value of isStreaming flag from any of
children is true. Flipping the coin, some children might have isStreaming flag
as "false". If the filter being pruned is a descendant to such children (in
other word, ancestor of streaming node), LocalRelation is incorrectly tagged as
streaming where it should be batch.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT verifying the fix. The new UT fails without this PR and passes with
this PR.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45406 from HeartSaVioR/SPARK-47305.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 8d6bd9bbd29da6023e5740b622e12c7e1f8581ce)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +-
.../optimizer/PropagateEmptyRelationSuite.scala | 43 +++++++++++++++++++++-
2 files changed, 43 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7aebf7c28f11..3d774af1ce33 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1636,9 +1636,9 @@ object PruneFilters extends Rule[LogicalPlan] with
PredicateHelper {
// If the filter condition always evaluate to null or false,
// replace the input with an empty relation.
case Filter(Literal(null, _), child) =>
- LocalRelation(child.output, data = Seq.empty, isStreaming =
plan.isStreaming)
+ LocalRelation(child.output, data = Seq.empty, isStreaming =
child.isStreaming)
case Filter(Literal(false, BooleanType), child) =>
- LocalRelation(child.output, data = Seq.empty, isStreaming =
plan.isStreaming)
+ LocalRelation(child.output, data = Seq.empty, isStreaming =
child.isStreaming)
// If any deterministic condition is guaranteed to be true given the
constraints on the child's
// output, remove the condition
case f @ Filter(fc, p: LogicalPlan) =>
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index fe45e02c67fa..a1132eadcc6f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -21,10 +21,10 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Literal, UnspecifiedFrame}
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal,
UnspecifiedFrame}
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, LocalRelation,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter,
LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StructType}
@@ -221,6 +221,45 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("SPARK-47305 correctly tag isStreaming when propagating empty relation
" +
+ "with the plan containing batch and streaming") {
+ val data = Seq(Row(1))
+
+ val outputForStream = Seq($"a".int)
+ val schemaForStream = DataTypeUtils.fromAttributes(outputForStream)
+ val converterForStream =
CatalystTypeConverters.createToCatalystConverter(schemaForStream)
+
+ val outputForBatch = Seq($"b".int)
+ val schemaForBatch = DataTypeUtils.fromAttributes(outputForBatch)
+ val converterForBatch =
CatalystTypeConverters.createToCatalystConverter(schemaForBatch)
+
+ val streamingRelation = LocalRelation(
+ outputForStream,
+ data.map(converterForStream(_).asInstanceOf[InternalRow]),
+ isStreaming = true)
+ val batchRelation = LocalRelation(
+ outputForBatch,
+ data.map(converterForBatch(_).asInstanceOf[InternalRow]),
+ isStreaming = false)
+
+ val query = streamingRelation
+ .join(batchRelation.where(false).select($"b"), LeftOuter,
+ Some(EqualTo($"a", $"b")))
+
+ val analyzedQuery = query.analyze
+
+ val optimized = Optimize.execute(analyzedQuery)
+ // This is to deal with analysis for join condition. We expect the
analyzed plan to contain
+ // filter and projection in batch relation, and know they will go away
after optimization.
+ // The point to check here is that the node is replaced with "empty"
LocalRelation, but the
+ // flag `isStreaming` is properly propagated.
+ val correctAnswer = analyzedQuery transform {
+ case Project(_, Filter(_, l: LocalRelation)) => l.copy(data = Seq.empty)
+ }
+
+ comparePlans(optimized, correctAnswer)
+ }
+
test("don't propagate empty streaming relation through agg") {
val output = Seq($"a".int)
val data = Seq(Row(1))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]