Yiu-Chung Lee created SPARK-44512:
-------------------------------------
Summary: dataset.sort.select.write.partitionBy does not return a
sorted output if AQE is enabled
Key: SPARK-44512
URL: https://issues.apache.org/jira/browse/SPARK-44512
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.1
Environment: Code that replicates the problem:
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple3;
// To compile: javac Test.java && jar cvf Test.jar Test.class
// bug: spark-submit --class Test Test.jar
// no bug: spark-submit --class Test Test.jar workaround
// no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test
Test.jar (3 output files in each partition key)
public class Test {
public static void main(String args[]) throws IOException {
final var spark = SparkSession
.builder()
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.getOrCreate();
final var hadoopConf = spark.sparkContext().hadoopConfiguration();
final var fs = FileSystem.get(hadoopConf);
fs.setWriteChecksum(false);
// create a minimal dataset that is enough to reproduce the bug
// The three columns are named _1, _2, and _3 (the field names of
Tuple3)
var dataset = spark.createDataset(List.of(
new Tuple3<Long, String, String>(3L, "a", "r"),
new Tuple3<Long, String, String>(3L, "b", "r"),
new Tuple3<Long, String, String>(2L, "b", "q"),
new Tuple3<Long, String, String>(2L, "a", "q"),
new Tuple3<Long, String, String>(1L, "a", "p"),
new Tuple3<Long, String, String>(1L, "b", "p")
),
Encoders.tuple(Encoders.LONG(), Encoders.STRING(),
Encoders.STRING()))
.sort("_1")
.select("_2", "_3");
// This is an identity mapper, i.e. returns itself
// Enabled by adding an argument "workaround" when executing
spark-submit.
// With AQE enabled, .sort() will work as intended only if this
identity mapper
// is inserted between .sort() and .select() in the pipeline
if (args.length > 0 && args[0].equals("workaround")) {
dataset = dataset.map((MapFunction<Row, Row>) row -> row,
dataset.encoder());
}
// output column _3 to text files, partitioned by column _2
// _1 is only for sorting purpose, not used in output
// output will not be sorted without the identity mapper
dataset.write()
.mode("overwrite")
.partitionBy("_2")
.text("output");
dataset.explain();
spark.close();
}
}
Reporter: Yiu-Chung Lee
Attachments: Test.java
(In this example the dataset is of type Tuple3, and the columns are named _1,
_2 and _3)
I found then when AQE is enabled, the following code does not produce an output
{{dataset.sort("_1")}}
{{.select("_2", "_3")}}
{{.write()}}
{{.partitionBy("_2")}}
{{.text("output");}}
However, if I insert an identity mapper between select and write, the output
would be sorted as expected.
{{dataset.sort("_1")}}
{{.select("_2", "_3")}}
{{.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}
{{{}.write(){}}}{{{}.partitionBy("_2"){}}}
{{.text("output")}}
Below is the complete code that replicates the problem.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]