[
https://issues.apache.org/jira/browse/SPARK-43327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zzzzming95 updated SPARK-43327:
-------------------------------
Description:
In this jira, the case where `outputOrdering` might not work if AQE is enabled
has been resolved.
https://issues.apache.org/jira/browse/SPARK-40588
However, since it materializes the AQE plan in advance (triggers
getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute
When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with an error.
Normally this step should be executed after committer.setupJob(job).
This may eventually result in the insertoverwrite directory being deleted.
{code:java}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite(amt1 int) STORED AS ORC")
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite2(amt1 long) STORED AS ORC")
sql("INSERT OVERWRITE TABLE spark32_overwrite2 select 6000044164")
sql("set spark.sql.ansi.enabled=true")
val loc =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("spark32_overwrite")).location
val fs = FileSystem.get(loc, spark.sparkContext.hadoopConfiguration)
println("Location exists: " + fs.exists(new Path(loc)))
try {
sql("INSERT OVERWRITE TABLE spark32_overwrite select amt1 from " +
"(select cast(amt1 as int) as amt1 from spark32_overwrite2 distribute by
amt1)")
} finally {
println("Location exists: " + fs.exists(new Path(loc)))
} {code}
was:
In this jira, the case where `outputOrdering` might not work if AQE is enabled
has been resolved. However, since it materializes the AQE plan in advance
(triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to
not execute When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed
with an error.
Normally this step should be executed after committer.setupJob(job).
This may eventually result in the insertoverwrite directory being deleted.
{code:java}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite(amt1 int) STORED AS ORC")
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite2(amt1 long) STORED AS ORC")
sql("INSERT OVERWRITE TABLE spark32_overwrite2 select 6000044164")
sql("set spark.sql.ansi.enabled=true")
val loc =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("spark32_overwrite")).location
val fs = FileSystem.get(loc, spark.sparkContext.hadoopConfiguration)
println("Location exists: " + fs.exists(new Path(loc)))
try {
sql("INSERT OVERWRITE TABLE spark32_overwrite select amt1 from " +
"(select cast(amt1 as int) as amt1 from spark32_overwrite2 distribute by
amt1)")
} finally {
println("Location exists: " + fs.exists(new Path(loc)))
} {code}
> Trigger `committer.setupJob` before plan execute in `FileFormatWriter`
> ----------------------------------------------------------------------
>
> Key: SPARK-43327
> URL: https://issues.apache.org/jira/browse/SPARK-43327
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.3
> Reporter: zzzzming95
> Priority: Major
>
> In this jira, the case where `outputOrdering` might not work if AQE is
> enabled has been resolved.
> https://issues.apache.org/jira/browse/SPARK-40588
> However, since it materializes the AQE plan in advance (triggers
> getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not
> execute When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with
> an error.
> Normally this step should be executed after committer.setupJob(job).
> This may eventually result in the insertoverwrite directory being deleted.
>
> {code:java}
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark.sql.QueryTest
> import org.apache.spark.sql.catalyst.TableIdentifier
> sql("CREATE TABLE IF NOT EXISTS spark32_overwrite(amt1 int) STORED AS ORC")
> sql("CREATE TABLE IF NOT EXISTS spark32_overwrite2(amt1 long) STORED AS ORC")
> sql("INSERT OVERWRITE TABLE spark32_overwrite2 select 6000044164")
> sql("set spark.sql.ansi.enabled=true")
> val loc =
>
> spark.sessionState.catalog.getTableMetadata(TableIdentifier("spark32_overwrite")).location
> val fs = FileSystem.get(loc, spark.sparkContext.hadoopConfiguration)
> println("Location exists: " + fs.exists(new Path(loc)))
> try {
> sql("INSERT OVERWRITE TABLE spark32_overwrite select amt1 from " +
> "(select cast(amt1 as int) as amt1 from spark32_overwrite2 distribute by
> amt1)")
> } finally {
> println("Location exists: " + fs.exists(new Path(loc)))
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]