[ 
https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746025#comment-17746025
 ] 

Yiu-Chung Lee commented on SPARK-44512:
---------------------------------------

[^Test.java]
(Attached the code)

To compile: javac Test.java && jar cvf Test.jar Test.class
bug reproduce: spark-submit --class Test Test.jar
no bug if workaround is enabled: spark-submit --class Test Test.jar workaround
no bug too if AQE is disabled: spark-submit --conf 
spark.sql.adaptive.enabled=false --class Test Test.jar (3 output files in each 
partition key)

 

> dataset.sort.select.write.partitionBy does not return a sorted output if AQE 
> is enabled
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-44512
>                 URL: https://issues.apache.org/jira/browse/SPARK-44512
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1
>         Environment: Code that replicates the problem:
>  
> import java.io.IOException;
> import java.util.List;
> import java.util.function.Function;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.spark.api.java.function.MapFunction;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import scala.Tuple3;
> // To compile: javac Test.java && jar cvf Test.jar Test.class
> // bug: spark-submit --class Test Test.jar
> // no bug: spark-submit --class Test Test.jar workaround
> // no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test 
> Test.jar (3 output files in each partition key)
> public class Test {
>     public static void main(String args[]) throws IOException {
>         final var spark = SparkSession
>             .builder()
>             .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
> "false")
>             .getOrCreate();
>         final var hadoopConf = spark.sparkContext().hadoopConfiguration();
>         final var fs = FileSystem.get(hadoopConf);
>         fs.setWriteChecksum(false);
>         // create a minimal dataset that is enough to reproduce the bug
>         // The three columns are named _1, _2, and _3 (the field names of 
> Tuple3)
>         var dataset = spark.createDataset(List.of(
>                 new Tuple3<Long, String, String>(3L, "a", "r"),
>                 new Tuple3<Long, String, String>(3L, "b", "r"),
>                 new Tuple3<Long, String, String>(2L, "b", "q"),
>                 new Tuple3<Long, String, String>(2L, "a", "q"),
>                 new Tuple3<Long, String, String>(1L, "a", "p"),
>                 new Tuple3<Long, String, String>(1L, "b", "p")
>             ),
>             Encoders.tuple(Encoders.LONG(), Encoders.STRING(), 
> Encoders.STRING()))
>         .sort("_1")
>         .select("_2", "_3");
>         // This is an identity mapper, i.e. returns itself
>         // Enabled by adding an argument "workaround" when executing 
> spark-submit.
>         // With AQE enabled, .sort() will work as intended only if this 
> identity mapper
>         // is inserted between .sort() and .select() in the pipeline
>         if (args.length > 0 && args[0].equals("workaround")) {
>             dataset = dataset.map((MapFunction<Row, Row>) row -> row, 
> dataset.encoder());
>         }
>         // output column _3 to text files, partitioned by column _2
>         // _1 is only for sorting purpose, not used in output
>         // output will not be sorted without the identity mapper
>         dataset.write()
>             .mode("overwrite")
>             .partitionBy("_2")
>             .text("output");
>         dataset.explain();
>         spark.close();
>     }
> }
>            Reporter: Yiu-Chung Lee
>            Priority: Major
>         Attachments: Test.java
>
>
> (In this example the dataset is of type Tuple3, and the columns are named _1, 
> _2 and _3)
>  
> I found then when AQE is enabled, the following code does not produce an 
> output
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.write()}}
> {{.partitionBy("_2")}}
> {{.text("output");}}
>  
> However, if I insert an identity mapper between select and write, the output 
> would be sorted as expected.
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}
> {{{}.write(){}}}{{{}.partitionBy("_2"){}}}
> {{.text("output")}}
> Below is the complete code that replicates the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to