jackylee-ch commented on code in PR #8422:
URL: https://github.com/apache/incubator-gluten/pull/8422#discussion_r1905119402
##########
gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala:
##########
@@ -1196,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
+ testGluten("SPARK-32932: Do not use local shuffle read at final stage on
write command") {
+ withSQLConf(
+ SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+ ) {
+ val data =
+ for (
+ i <- 1L to 10L;
+ j <- 1L to 3L
+ ) yield (i, j)
+
+ val df = data.toDF("i", "j").repartition($"j")
+ var noLocalread: Boolean = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ qe.executedPlan match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+ noLocalread = collect(plan) {
Review Comment:
Remove the child plan check as we would add `FackRowAdaptor`, and the check
has already been remove since 3.4.0.
##########
gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala:
##########
@@ -1196,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
+ testGluten("SPARK-32932: Do not use local shuffle read at final stage on
write command") {
+ withSQLConf(
+ SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+ ) {
+ val data =
+ for (
+ i <- 1L to 10L;
+ j <- 1L to 3L
+ ) yield (i, j)
+
+ val df = data.toDF("i", "j").repartition($"j")
+ var noLocalread: Boolean = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ qe.executedPlan match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+ noLocalread = collect(plan) {
+ case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+ }.isEmpty
+ case _ => // ignore other events
+ }
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+
+ withTable("t") {
+ df.write.partitionBy("j").saveAsTable("t")
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+ }
+
+ // Test DataSource v2
+ val format = classOf[NoopDataSource].getName
+ df.write.format(format).mode("overwrite").save()
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
Review Comment:
Refine the child plan check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]