Asif created SPARK-55185:
----------------------------
Summary: Adding rule InferFiltersFromConstraints to the Batch
"Operator Optimization after Inferring Filters" causes idempotency break
Key: SPARK-55185
URL: https://issues.apache.org/jira/browse/SPARK-55185
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.1.1, 4.2.0
Reporter: Asif
The bug SPARK-55072 fix requires *InferFiltersFromConstraints* rule to run as
part of the batch
*Batch("Operator Optimization after Inferring Filters", fixedPoint,*
{*}operatorOptimizationRuleSet: _{*}).*
But if *InferFiltersFromConstraints* is run as part of Fixed Point Iteration ,
then the idempotency is not achieved in special cases ( as seen in the test
below)
{quote}
test("SPARK-55072: Bug fix needs this test to pass as that is dependent change
tested here - 2") {
val rel1 = LocalRelation(
Seq($"a".int, $"b".int),
InternalRow(1, 1) :: InternalRow(2, 1) :: InternalRow(3, 3) :: InternalRow(7, 8)
:: InternalRow(5, 6) :: Nil)
val nullRel = Project(
Seq(
Alias(Literal(null, IntegerType), "a")(),
Alias(Literal(null, IntegerType), "b")()),
OneRowRelation())
val distinct = Aggregate(Seq($"a", $"b"), Seq($"a"), nullRel.union(rel1))
val agg = Aggregate(Seq($"a"), Seq(sum($"a").as("aggFunctionAlias"), $"a"),
distinct).analyze
val rel2 = LocalRelation(
Seq($"c".int, $"d".int),
InternalRow(1, 1) :: InternalRow(2, 1) :: InternalRow(3, 3) :: InternalRow(6,
6) ::
InternalRow(7, 7) :: InternalRow(9, 9) :: Nil).analyze
val join = rel2.join(agg, condition =
Some(Cast($"d", LongType) === $"aggFunctionAlias" && $"a" === $"c")).analyze
val optimizer = new SimpleTestOptimizer()
val batches = optimizer.defaultBatches
val indexBeforeNewFilterInfer =
batches.indexWhere(_.name == "Operator Optimization before Inferring Filters")
val indexAfterNewFilterInfer =
batches.indexWhere(_.name == "Operator Optimization after Inferring Filters")
assert(indexAfterNewFilterInfer != -1 && indexBeforeNewFilterInfer != -1)
// ensure that InferFiltersFromConstraint rule is present in the batch Operator
Optimization
// after Inferring Filters
val batchOfInterest = batches(indexAfterNewFilterInfer)
val optimizerToUse = if (!batchOfInterest.rules.exists(
_.ruleName == InferFiltersFromConstraints.ruleName)) {
new SimpleTestOptimizer() {
override def defaultBatches: Seq[Batch] = {
val mutableBatches = super.defaultBatches.toBuffer
val afterInferBatch = mutableBatches(indexAfterNewFilterInfer)
val mutableRules = afterInferBatch.rules.toBuffer
val newRules = mutableRules.append(InferFiltersFromConstraints).toSeq
val newAfterInferBatch = new Batch(afterInferBatch.name,
afterInferBatch.strategy,
newRules: _*)
mutableBatches(indexAfterNewFilterInfer) = newAfterInferBatch
mutableBatches.toSeq
}
}
} else {
optimizer
}
optimizerToUse.execute(join)
}{quote}
The above test is also added as bug test.
The issue is a complex interaction between *PushDownPredicates* rule and
*InferFiltersFromConstraints and PruneFilters rules, involving union node.*
Ideally the fix should be such that removal or addition of any rule in itself,
should not cause problem in achieving idempotency.
But as seen in this case,
PruneFilter rule, results in plan modification such that an Empty Relation is
created, within the Union Leg.
As a result only one leg of Union is of importance, and so Union can be
replaced directly with the non empty child leg.
This task is usually done by PropagateEmptyRelation rule.
But since this PropagateEmptyRelation rule is not invoked after PruneFilter,
the empty relation remains, which causes a behaviour where PushdownPredicate
pushes a filterdown, while InferFilterFromConstraints again materialized the
new filter.
I feel that invoking PropagateEmptyRelation after PruneFilters is the safe
approach, instead of tweaking the constraints code related to Union, which can
become ugly especially since it is related to Union node's constraint
logic.{*}{*}{*}{*}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]