[ 
https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yiu-Chung Lee updated SPARK-44512:
----------------------------------
    Environment:     (was: Code that replicates the problem:

 

import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple3;

// To compile: javac Test.java && jar cvf Test.jar Test.class
// bug: spark-submit --class Test Test.jar
// no bug: spark-submit --class Test Test.jar workaround
// no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test 
Test.jar (3 output files in each partition key)

public class Test {
    public static void main(String args[]) throws IOException {
        final var spark = SparkSession
            .builder()
            .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
            .getOrCreate();
        final var hadoopConf = spark.sparkContext().hadoopConfiguration();
        final var fs = FileSystem.get(hadoopConf);
        fs.setWriteChecksum(false);

        // create a minimal dataset that is enough to reproduce the bug
        // The three columns are named _1, _2, and _3 (the field names of 
Tuple3)
        var dataset = spark.createDataset(List.of(
                new Tuple3<Long, String, String>(3L, "a", "r"),
                new Tuple3<Long, String, String>(3L, "b", "r"),
                new Tuple3<Long, String, String>(2L, "b", "q"),
                new Tuple3<Long, String, String>(2L, "a", "q"),
                new Tuple3<Long, String, String>(1L, "a", "p"),
                new Tuple3<Long, String, String>(1L, "b", "p")
            ),
            Encoders.tuple(Encoders.LONG(), Encoders.STRING(), 
Encoders.STRING()))
        .sort("_1")
        .select("_2", "_3");

        // This is an identity mapper, i.e. returns itself
        // Enabled by adding an argument "workaround" when executing 
spark-submit.
        // With AQE enabled, .sort() will work as intended only if this 
identity mapper
        // is inserted between .sort() and .select() in the pipeline
        if (args.length > 0 && args[0].equals("workaround")) {
            dataset = dataset.map((MapFunction<Row, Row>) row -> row, 
dataset.encoder());
        }

        // output column _3 to text files, partitioned by column _2
        // _1 is only for sorting purpose, not used in output
        // output will not be sorted without the identity mapper
        dataset.write()
            .mode("overwrite")
            .partitionBy("_2")
            .text("output");
        dataset.explain();
        spark.close();
    }
})

> dataset.sort.select.write.partitionBy does not return a sorted output if AQE 
> is enabled
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-44512
>                 URL: https://issues.apache.org/jira/browse/SPARK-44512
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1
>            Reporter: Yiu-Chung Lee
>            Priority: Major
>         Attachments: Test.java
>
>
> (In this example the dataset is of type Tuple3, and the columns are named _1, 
> _2 and _3)
>  
> I found then when AQE is enabled, the following code does not produce an 
> output
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.write()}}
> {{.partitionBy("_2")}}
> {{.text("output");}}
>  
> However, if I insert an identity mapper between select and write, the output 
> would be sorted as expected.
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}
> {{{}.write(){}}}{{{}.partitionBy("_2"){}}}
> {{.text("output")}}
> Below is the complete code that replicates the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to