[
https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yiu-Chung Lee updated SPARK-44512:
----------------------------------
Environment: (was: Code that replicates the problem:
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple3;
// To compile: javac Test.java && jar cvf Test.jar Test.class
// bug: spark-submit --class Test Test.jar
// no bug: spark-submit --class Test Test.jar workaround
// no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test
Test.jar (3 output files in each partition key)
public class Test {
public static void main(String args[]) throws IOException {
final var spark = SparkSession
.builder()
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.getOrCreate();
final var hadoopConf = spark.sparkContext().hadoopConfiguration();
final var fs = FileSystem.get(hadoopConf);
fs.setWriteChecksum(false);
// create a minimal dataset that is enough to reproduce the bug
// The three columns are named _1, _2, and _3 (the field names of
Tuple3)
var dataset = spark.createDataset(List.of(
new Tuple3<Long, String, String>(3L, "a", "r"),
new Tuple3<Long, String, String>(3L, "b", "r"),
new Tuple3<Long, String, String>(2L, "b", "q"),
new Tuple3<Long, String, String>(2L, "a", "q"),
new Tuple3<Long, String, String>(1L, "a", "p"),
new Tuple3<Long, String, String>(1L, "b", "p")
),
Encoders.tuple(Encoders.LONG(), Encoders.STRING(),
Encoders.STRING()))
.sort("_1")
.select("_2", "_3");
// This is an identity mapper, i.e. returns itself
// Enabled by adding an argument "workaround" when executing
spark-submit.
// With AQE enabled, .sort() will work as intended only if this
identity mapper
// is inserted between .sort() and .select() in the pipeline
if (args.length > 0 && args[0].equals("workaround")) {
dataset = dataset.map((MapFunction<Row, Row>) row -> row,
dataset.encoder());
}
// output column _3 to text files, partitioned by column _2
// _1 is only for sorting purpose, not used in output
// output will not be sorted without the identity mapper
dataset.write()
.mode("overwrite")
.partitionBy("_2")
.text("output");
dataset.explain();
spark.close();
}
})
> dataset.sort.select.write.partitionBy does not return a sorted output if AQE
> is enabled
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-44512
> URL: https://issues.apache.org/jira/browse/SPARK-44512
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.1
> Reporter: Yiu-Chung Lee
> Priority: Major
> Attachments: Test.java
>
>
> (In this example the dataset is of type Tuple3, and the columns are named _1,
> _2 and _3)
>
> I found then when AQE is enabled, the following code does not produce an
> output
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.write()}}
> {{.partitionBy("_2")}}
> {{.text("output");}}
>
> However, if I insert an identity mapper between select and write, the output
> would be sorted as expected.
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}
> {{{}.write(){}}}{{{}.partitionBy("_2"){}}}
> {{.text("output")}}
> Below is the complete code that replicates the problem.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]