[
https://issues.apache.org/jira/browse/SPARK-38667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lars updated SPARK-38667:
-------------------------
Affects Version/s: 3.1.2
(was: 3.2.1)
> Optimizer generates error when using inner join along with sequence
> -------------------------------------------------------------------
>
> Key: SPARK-38667
> URL: https://issues.apache.org/jira/browse/SPARK-38667
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 3.1.2
> Reporter: Lars
> Priority: Major
>
> This issue occurred in a more complex scenario, so I've broken it down into a
> simple case.
> {*}Steps to reproduce{*}: Execute the following example. The code should run
> without errors, but instead a *java.lang.IllegalArgumentException: Illegal
> sequence boundaries: 4 to 2 by 1* is thrown.
> {code:java}
> package com.example
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> object SparkIssue {
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder()
> .master("local[*]")
> .getOrCreate()
> val dfA = spark
> .createDataFrame(Seq((1, 1), (2, 4)))
> .toDF("a1", "a2")
> val dfB = spark
> .createDataFrame(Seq((1, 5), (2, 2)))
> .toDF("b1", "b2")
> dfA.join(dfB, dfA("a1") === dfB("b1"), "inner")
> .where(col("a2") < col("b2"))
> .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1))))
> .show()
> spark.stop()
> }
> }
> {code}
> When I look at the Optimized Logical Plan I can see that the Inner Join and
> the Filter are brought together, with an additional check for an empty
> Sequence. The exception is thrown because the Sequence check is executed
> before the Filter.
> {code:java}
> == Parsed Logical Plan ==
> 'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1),
> None)) AS x#24]
> +- Filter (a2#5 < b2#13)
> +- Join Inner, (a1#4 = b1#12)
> :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
> : +- LocalRelation [_1#0, _2#1]
> +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
> +- LocalRelation [_1#8, _2#9]
> == Analyzed Logical Plan ==
> a1: int, a2: int, b1: int, b2: int, x: int
> Project [a1#4, a2#5, b1#12, b2#13, x#25]
> +- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))),
> false, [x#25]
> +- Filter (a2#5 < b2#13)
> +- Join Inner, (a1#4 = b1#12)
> :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
> : +- LocalRelation [_1#0, _2#1]
> +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
> +- LocalRelation [_1#8, _2#9]
> == Optimized Logical Plan ==
> Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false,
> [x#25]
> +- Join Inner, (((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)),
> true) > 0) AND (a2#5 < b2#13)) AND (a1#4 = b1#12))
> :- LocalRelation [a1#4, a2#5]
> +- LocalRelation [b1#12, b2#13]
> == Physical Plan ==
> Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), [a1#4,
> a2#5, b1#12, b2#13], false, [x#25]
> +- *(1) BroadcastHashJoin [a1#4], [b1#12], Inner, BuildRight,
> ((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND
> (a2#5 < b2#13)), false
> :- *(1) LocalTableScan [a1#4, a2#5]
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> false] as bigint)),false), [id=#15]
> +- LocalTableScan [b1#12, b2#13]
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]