zzzzming95 created SPARK-43327:
----------------------------------

             Summary: Trigger `committer.setupJob` before plan execute in 
`FileFormatWriter`
                 Key: SPARK-43327
                 URL: https://issues.apache.org/jira/browse/SPARK-43327
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.3
            Reporter: zzzzming95


In this jira, the case where `outputOrdering` might not work if AQE is enabled 
has been resolved. However, since it materializes the AQE plan in advance 
(triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to 
not execute When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed 
with an error.

Normally this step should be executed after committer.setupJob(job).

This may eventually result in the insertoverwrite directory being deleted.

 
{code:java}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier

sql("CREATE TABLE IF NOT EXISTS spark32_overwrite(amt1 int) STORED AS ORC")
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite2(amt1 long) STORED AS ORC")
sql("INSERT OVERWRITE TABLE spark32_overwrite2 select 6000044164")
sql("set spark.sql.ansi.enabled=true")

val loc =
  
spark.sessionState.catalog.getTableMetadata(TableIdentifier("spark32_overwrite")).location

val fs = FileSystem.get(loc, spark.sparkContext.hadoopConfiguration)
println("Location exists: " + fs.exists(new Path(loc)))

try {
  sql("INSERT OVERWRITE TABLE spark32_overwrite select amt1 from " +
    "(select cast(amt1 as int) as amt1 from spark32_overwrite2 distribute by 
amt1)")
} finally {
  println("Location exists: " + fs.exists(new Path(loc)))
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to