Pablo García created SPARK-56729:
------------------------------------
Summary: ReplaceData and WriteDelta should implement
SupportsNonDeterministicExpression to allow MERGE INTO with non-deterministic
source plans
Key: SPARK-56729
URL: https://issues.apache.org/jira/browse/SPARK-56729
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.0.0, 3.5.6, 3.5.0
Reporter: Pablo García
h2. Problem
{{MERGE INTO}} queries fail with {{INVALID_NON_DETERMINISTIC_EXPRESSIONS}} when
the source plan contains any non-deterministic expression (e.g. {{uuid()}},
{{current_timestamp()}}, {{input_file_name()}}, or even upstream ML inference
pipelines). This regression was introduced in Spark 3.5 when the
{{RewriteMergeIntoTable}} logic was moved into Spark core.
The same queries work correctly on Spark 3.3.
h2. Root cause
In Spark 3.5, {{buildReplaceDataPlan}} wraps the source plan in an {{Exists}}
subquery via {{toGroupFilterCondition}}. This {{Exists}} is stored as
{{groupFilterCondition}} on the {{ReplaceData}} node.
{{PlanExpression.deterministic}} propagates non-determinism from the source plan
into the {{Exists}}, making the entire {{ReplaceData}} node non-deterministic.
{{CheckAnalysis}} then rejects {{ReplaceData}} because it is not in the
allow-list
of operators that tolerate non-deterministic expressions ({{Project}},
{{Filter}},
{{Aggregate}}, {{Window}}, {{Expand}}, {{Generate}}, {{LateralJoin}}).
h2. Proposed fix
SPARK-48871 introduced the {{SupportsNonDeterministicExpression}} trait for
exactly this class of problem, and {{CheckAnalysis}} already respects it:
{code:scala}
private def operatorAllowsNonDeterministicExpressions(plan: LogicalPlan):
Boolean = {
plan match {
case p: SupportsNonDeterministicExpression =>
p.allowNonDeterministicExpression
case _ => false
}
}
{code}
However, {{ReplaceData}} (used for copy-on-write MERGE) and {{WriteDelta}}
(used for merge-on-read MERGE) do not implement this trait.
The fix is to have both {{ReplaceData}} and {{WriteDelta}} extend
{{SupportsNonDeterministicExpression}} with
{{allowNonDeterministicExpression = true}}.
h2. Reproduction
Any {{MERGE INTO}} on a DSv2 table (e.g. Apache Iceberg) where the source
plan contains a non-deterministic expression:
{code:sql}
-- source_view was built with uuid() or current_timestamp()
MERGE INTO target USING source_view
ON target.id = source_view.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
{code}
Error:
{noformat}
AnalysisException: [INVALID_NON_DETERMINISTIC_EXPRESSIONS]
The operator expects a deterministic expression...
{noformat}
h2. References
* SPARK-48871 (introduced {{SupportsNonDeterministicExpression}})
* [apache/iceberg#14585|https://github.com/apache/iceberg/issues/14585]
(multiple users affected across Glue 4/5, Iceberg 1.7-1.10)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]