Yiu-Chung Lee created SPARK-44512:
-------------------------------------

             Summary: dataset.sort.select.write.partitionBy does not return a 
sorted output if AQE is enabled
                 Key: SPARK-44512
                 URL: https://issues.apache.org/jira/browse/SPARK-44512
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.1
         Environment: Code that replicates the problem:

 

import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple3;

// To compile: javac Test.java && jar cvf Test.jar Test.class
// bug: spark-submit --class Test Test.jar
// no bug: spark-submit --class Test Test.jar workaround
// no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test 
Test.jar (3 output files in each partition key)

public class Test {
    public static void main(String args[]) throws IOException {
        final var spark = SparkSession
            .builder()
            .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
            .getOrCreate();
        final var hadoopConf = spark.sparkContext().hadoopConfiguration();
        final var fs = FileSystem.get(hadoopConf);
        fs.setWriteChecksum(false);

        // create a minimal dataset that is enough to reproduce the bug
        // The three columns are named _1, _2, and _3 (the field names of 
Tuple3)
        var dataset = spark.createDataset(List.of(
                new Tuple3<Long, String, String>(3L, "a", "r"),
                new Tuple3<Long, String, String>(3L, "b", "r"),
                new Tuple3<Long, String, String>(2L, "b", "q"),
                new Tuple3<Long, String, String>(2L, "a", "q"),
                new Tuple3<Long, String, String>(1L, "a", "p"),
                new Tuple3<Long, String, String>(1L, "b", "p")
            ),
            Encoders.tuple(Encoders.LONG(), Encoders.STRING(), 
Encoders.STRING()))
        .sort("_1")
        .select("_2", "_3");

        // This is an identity mapper, i.e. returns itself
        // Enabled by adding an argument "workaround" when executing 
spark-submit.
        // With AQE enabled, .sort() will work as intended only if this 
identity mapper
        // is inserted between .sort() and .select() in the pipeline
        if (args.length > 0 && args[0].equals("workaround")) {
            dataset = dataset.map((MapFunction<Row, Row>) row -> row, 
dataset.encoder());
        }

        // output column _3 to text files, partitioned by column _2
        // _1 is only for sorting purpose, not used in output
        // output will not be sorted without the identity mapper
        dataset.write()
            .mode("overwrite")
            .partitionBy("_2")
            .text("output");
        dataset.explain();
        spark.close();
    }
}
            Reporter: Yiu-Chung Lee
         Attachments: Test.java

(In this example the dataset is of type Tuple3, and the columns are named _1, 
_2 and _3)

 

I found then when AQE is enabled, the following code does not produce an output

{{dataset.sort("_1")}}
{{.select("_2", "_3")}}
{{.write()}}
{{.partitionBy("_2")}}
{{.text("output");}}

 

However, if I insert an identity mapper between select and write, the output 
would be sorted as expected.

{{dataset.sort("_1")}}
{{.select("_2", "_3")}}
{{.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}
{{{}.write(){}}}{{{}.partitionBy("_2"){}}}
{{.text("output")}}

Below is the complete code that replicates the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to