[
https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746025#comment-17746025
]
Yiu-Chung Lee commented on SPARK-44512:
---------------------------------------
[^Test.java]
(Attached the code)
To compile: javac Test.java && jar cvf Test.jar Test.class
bug reproduce: spark-submit --class Test Test.jar
no bug if workaround is enabled: spark-submit --class Test Test.jar workaround
no bug too if AQE is disabled: spark-submit --conf
spark.sql.adaptive.enabled=false --class Test Test.jar (3 output files in each
partition key)
> dataset.sort.select.write.partitionBy does not return a sorted output if AQE
> is enabled
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-44512
> URL: https://issues.apache.org/jira/browse/SPARK-44512
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.1
> Environment: Code that replicates the problem:
>
> import java.io.IOException;
> import java.util.List;
> import java.util.function.Function;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.spark.api.java.function.MapFunction;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import scala.Tuple3;
> // To compile: javac Test.java && jar cvf Test.jar Test.class
> // bug: spark-submit --class Test Test.jar
> // no bug: spark-submit --class Test Test.jar workaround
> // no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test
> Test.jar (3 output files in each partition key)
> public class Test {
> public static void main(String args[]) throws IOException {
> final var spark = SparkSession
> .builder()
> .config("mapreduce.fileoutputcommitter.marksuccessfuljobs",
> "false")
> .getOrCreate();
> final var hadoopConf = spark.sparkContext().hadoopConfiguration();
> final var fs = FileSystem.get(hadoopConf);
> fs.setWriteChecksum(false);
> // create a minimal dataset that is enough to reproduce the bug
> // The three columns are named _1, _2, and _3 (the field names of
> Tuple3)
> var dataset = spark.createDataset(List.of(
> new Tuple3<Long, String, String>(3L, "a", "r"),
> new Tuple3<Long, String, String>(3L, "b", "r"),
> new Tuple3<Long, String, String>(2L, "b", "q"),
> new Tuple3<Long, String, String>(2L, "a", "q"),
> new Tuple3<Long, String, String>(1L, "a", "p"),
> new Tuple3<Long, String, String>(1L, "b", "p")
> ),
> Encoders.tuple(Encoders.LONG(), Encoders.STRING(),
> Encoders.STRING()))
> .sort("_1")
> .select("_2", "_3");
> // This is an identity mapper, i.e. returns itself
> // Enabled by adding an argument "workaround" when executing
> spark-submit.
> // With AQE enabled, .sort() will work as intended only if this
> identity mapper
> // is inserted between .sort() and .select() in the pipeline
> if (args.length > 0 && args[0].equals("workaround")) {
> dataset = dataset.map((MapFunction<Row, Row>) row -> row,
> dataset.encoder());
> }
> // output column _3 to text files, partitioned by column _2
> // _1 is only for sorting purpose, not used in output
> // output will not be sorted without the identity mapper
> dataset.write()
> .mode("overwrite")
> .partitionBy("_2")
> .text("output");
> dataset.explain();
> spark.close();
> }
> }
> Reporter: Yiu-Chung Lee
> Priority: Major
> Attachments: Test.java
>
>
> (In this example the dataset is of type Tuple3, and the columns are named _1,
> _2 and _3)
>
> I found then when AQE is enabled, the following code does not produce an
> output
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.write()}}
> {{.partitionBy("_2")}}
> {{.text("output");}}
>
> However, if I insert an identity mapper between select and write, the output
> would be sorted as expected.
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}
> {{{}.write(){}}}{{{}.partitionBy("_2"){}}}
> {{.text("output")}}
> Below is the complete code that replicates the problem.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]