[
https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785350#comment-17785350
]
Dongjoon Hyun commented on SPARK-44512:
---------------------------------------
I reproduced [~leeyc0]'s report like the following.
*APACHE SPARK 3.5.0*
{code}
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"),
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2",
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
r
q
p
{code}
{code}
scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false")
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"),
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2",
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
p
q
r
{code}
*APACHE SPARK 3.4.1*
{code}
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"),
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2",
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
r
q
p
{code}
{code}
scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false")
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"),
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2",
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
p
q
r
{code}
*APACHE SPARK 3.3.3*
{code}
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"),
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2",
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
p
q
r
{code}
> dataset.sort.select.write.partitionBy sorts wrong column
> --------------------------------------------------------
>
> Key: SPARK-44512
> URL: https://issues.apache.org/jira/browse/SPARK-44512
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 3.4.1
> Reporter: Yiu-Chung Lee
> Priority: Blocker
> Labels: correctness
> Attachments: Test-Details-for-Query-0.png,
> Test-Details-for-Query-1.png
>
>
> (In this example the dataset is of type Tuple3, and the columns are named _1,
> _2 and _3)
>
> I found -then when AQE is enabled,- that the following code does not produce
> sorted output (.drop() also have the same problem), unless
> spark.sql.optimizer.plannedWrite.enabled is set to false.
> After further investigation, spark actually sorted wrong column in the
> following code.
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.write()}}
> {{.partitionBy("_2")}}
> {{.text("output");}}
>
> (the following workaround is no longer necessary)
> -However, if I insert an identity mapper between select and write, the output
> would be sorted as expected.-
> -{{dataset = dataset.sort("_1")}}-
> -{{.select("_2", "_3");}}-
> -{{dataset.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}-
> -{{.write()}}-
> -{{{}.{}}}{{{}partitionBy("_2"){}}}-
> -{{.text("output")}}-
> Below is the complete code that reproduces the problem.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]