[ 
https://issues.apache.org/jira/browse/SPARK-53275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-53275:
----------------------------------
    Description: 
For example:
{noformat}
# this particular example fails with 2 executors
bin/spark-shell --master "local[2]"

import org.apache.spark.sql.functions.udf
spark.udf.register("udf", (s: String) => s)

Seq((0, "2"), (0, "1")).toDF("a", "b").createOrReplaceTempView("v1")

// run in interpreted mode
sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")

sql("select a, udf(b) from v1 order by a, udf(b) asc").show(false)
// returns
// +---+------+
// |a  |udf(b)|
// +---+------+
// |0  |2     |
// |0  |1     |
// +---+------+
{noformat}
Another example:
{noformat}
# this particular example fails with one executor
bin/spark-shell --master "local[1]"

import org.apache.spark.sql.functions.udf
spark.udf.register("udf", (s: String) => s)

Seq((676,676,"1"),
(4056,56,"1"),
(8788,788,"1"),
(0,0,"1"),
(780,780,"2"),
(9676,676,"3"),
(9568,568,"2")).toDF("a", "b", "c").createOrReplaceTempView("v1")

val query = """select /*+ MERGEJOIN(t2) */ t1.b, t2.a, t1.c, t2.c
from v1 t1
join v1 t2 on t1.b = t2.a and udf(t1.c) = t2.c"""

sql(query).show(false)
// the above returns
// +---+---+---+---+
// |b  |a  |c  |c  |
// +---+---+---+---+
// |0  |0  |1  |1  |
// |676|676|1  |1  |
// |780|780|2  |2  |
// +---+---+---+---+

// switch to interpreted mode
sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")

sql(query).show(false)
// the above returns
// +---+---+---+---+
// |b  |a  |c  |c  |
// +---+---+---+---+
// |0  |0  |1  |1  |
// |780|780|2  |2  |
// +---+---+---+---+
//
// The row for b = 676 is missing due to incorrect sorting of the stream side.
{noformat}

  was:
For example:
{noformat}
# this particular example fails with 2 executors
bin/spark-shell --master "local[2]"

import org.apache.spark.sql.functions.udf
spark.udf.register("udf", (s: String) => s)

Seq((0, "2"), (0, "1")).toDF("a", "b").createOrReplaceTempView("v1")

// run in interpreted mode
sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")

sql("select a, udf(b) from v1 order by a, udf(b)").show(false)
// returns
// +---+------+
// |a  |udf(b)|
// +---+------+
// |0  |2     |
// |0  |1     |
// +---+------+
{noformat}
Another example:
{noformat}
# this particular example fails with one executor
bin/spark-shell --master "local[1]"

import org.apache.spark.sql.functions.udf
spark.udf.register("udf", (s: String) => s)

Seq((676,676,"1"),
(4056,56,"1"),
(8788,788,"1"),
(0,0,"1"),
(780,780,"2"),
(9676,676,"3"),
(9568,568,"2")).toDF("a", "b", "c").createOrReplaceTempView("v1")

val query = """select /*+ MERGEJOIN(t2) */ t1.b, t2.a, t1.c, t2.c
from v1 t1
join v1 t2 on t1.b = t2.a and udf(t1.c) = t2.c"""

sql(query).show(false)
// the above returns
// +---+---+---+---+
// |b  |a  |c  |c  |
// +---+---+---+---+
// |0  |0  |1  |1  |
// |676|676|1  |1  |
// |780|780|2  |2  |
// +---+---+---+---+

// switch to interpreted mode
sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")

sql(query).show(false)
// the above returns
// +---+---+---+---+
// |b  |a  |c  |c  |
// +---+---+---+---+
// |0  |0  |1  |1  |
// |780|780|2  |2  |
// +---+---+---+---+
//
// The row for b = 676 is missing due to incorrect sorting of the stream side.
{noformat}


> Incorrect ordering in interpreted mode when sort order includes stateful 
> expressions
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-53275
>                 URL: https://issues.apache.org/jira/browse/SPARK-53275
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.6
>            Reporter: Bruce Robbins
>            Priority: Major
>
> For example:
> {noformat}
> # this particular example fails with 2 executors
> bin/spark-shell --master "local[2]"
> import org.apache.spark.sql.functions.udf
> spark.udf.register("udf", (s: String) => s)
> Seq((0, "2"), (0, "1")).toDF("a", "b").createOrReplaceTempView("v1")
> // run in interpreted mode
> sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")
> sql("select a, udf(b) from v1 order by a, udf(b) asc").show(false)
> // returns
> // +---+------+
> // |a  |udf(b)|
> // +---+------+
> // |0  |2     |
> // |0  |1     |
> // +---+------+
> {noformat}
> Another example:
> {noformat}
> # this particular example fails with one executor
> bin/spark-shell --master "local[1]"
> import org.apache.spark.sql.functions.udf
> spark.udf.register("udf", (s: String) => s)
> Seq((676,676,"1"),
> (4056,56,"1"),
> (8788,788,"1"),
> (0,0,"1"),
> (780,780,"2"),
> (9676,676,"3"),
> (9568,568,"2")).toDF("a", "b", "c").createOrReplaceTempView("v1")
> val query = """select /*+ MERGEJOIN(t2) */ t1.b, t2.a, t1.c, t2.c
> from v1 t1
> join v1 t2 on t1.b = t2.a and udf(t1.c) = t2.c"""
> sql(query).show(false)
> // the above returns
> // +---+---+---+---+
> // |b  |a  |c  |c  |
> // +---+---+---+---+
> // |0  |0  |1  |1  |
> // |676|676|1  |1  |
> // |780|780|2  |2  |
> // +---+---+---+---+
> // switch to interpreted mode
> sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")
> sql(query).show(false)
> // the above returns
> // +---+---+---+---+
> // |b  |a  |c  |c  |
> // +---+---+---+---+
> // |0  |0  |1  |1  |
> // |780|780|2  |2  |
> // +---+---+---+---+
> //
> // The row for b = 676 is missing due to incorrect sorting of the stream side.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to