[
https://issues.apache.org/jira/browse/SPARK-27784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851588#comment-16851588
]
Dongjoon Hyun commented on SPARK-27784:
---------------------------------------
Is this still in `branch-2.4`, too?
> Alias ID reuse can break correctness when substituting foldable expressions
> ---------------------------------------------------------------------------
>
> Key: SPARK-27784
> URL: https://issues.apache.org/jira/browse/SPARK-27784
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.1, 2.3.2
> Reporter: Ryan Blue
> Priority: Major
> Labels: correctness
>
> This is a correctness bug when reusing a set of project expressions in the
> DataFrame API.
> Use case: a user was migrating a table to a new version with an additional
> column ("data" in the repro case). To migrate the user unions the old table
> ("t2") with the new table ("t1"), and applies a common set of projections to
> ensure the union doesn't hit an issue with ordering (SPARK-22335). In some
> cases, this produces an incorrect query plan:
> {code:java}
> Seq((4, "a"), (5, "b"), (6, "c")).toDF("id", "data").write.saveAsTable("t1")
> Seq(1, 2, 3).toDF("id").write.saveAsTable("t2")
> val dim = Seq(2, 3, 4).toDF("id")
> val outputCols = Seq($"id", coalesce($"data", lit("_")).as("data"))
> val t1 = spark.table("t1").select(outputCols:_*)
> val t2 = spark.table("t2").withColumn("data", lit(null)).select(outputCols:_*)
> t1.join(dim, t1("id") === dim("id")).select(t1("id"),
> t1("data")).union(t2).explain(true){code}
> {code:java}
> == Physical Plan ==
> Union
> :- *Project [id#330, _ AS data#237] <------------------------ THE CONSTANT IS
> FROM THE OTHER SIDE OF THE UNION
> : +- *BroadcastHashJoin [id#330], [id#234], Inner, BuildRight
> : :- *Project [id#330]
> : : +- *Filter isnotnull(id#330)
> : : +- *FileScan parquet t1[id#330] Batched: true, Format: Parquet,
> Location: CatalogFileIndex[s3://.../t1], PartitionFilters: [], PushedFilters:
> [IsNotNull(id)], ReadSchema: struct<id:int>
> : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> false] as bigint)))
> : +- LocalTableScan [id#234]
> +- *Project [id#340, _ AS data#237]
> +- *FileScan parquet t2[id#340] Batched: true, Format: Parquet, Location:
> CatalogFileIndex[s3://.../t2], PartitionFilters: [], PushedFilters: [],
> ReadSchema: struct<id:int>{code}
> The problem happens because "outputCols" has an alias. The ID for that alias
> is created when the projection Seq is created, so it is reused in both sides
> of the union.
> When FoldablePropagation runs, it identifies that "data" in the t2 side of
> the union is a foldable expression and replaces all references to it,
> including the references in the t1 side of the union.
> The join to a dimension table is necessary to reproduce the problem because
> it requires a Projection on top of the join that uses an AttributeReference
> for data#237. Otherwise, the projections are collapsed and the projection
> includes an Alias that does not get rewritten by FoldablePropagation.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]