[ https://issues.apache.org/jira/browse/SPARK-37210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572754#comment-17572754 ]
Zhen Wang commented on SPARK-37210: ----------------------------------- > BTW, could you please reopen the PR freshly? I'll try to ping the other > committers too. I'm trying to implement to allow always use of staging dir, I'll send a PR after adding test cases. > An error occurred while concurrently writing to different static partitions > --------------------------------------------------------------------------- > > Key: SPARK-37210 > URL: https://issues.apache.org/jira/browse/SPARK-37210 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1, 3.2.0 > Reporter: Zhen Wang > Priority: Major > Attachments: > [SPARK-37210]_Write_to_static_partition_in_dynamic_write_mode.patch, > image-2021-11-05-15-28-41-393.png > > > An error occurred while concurrently writing to different static partitions. > For writing to a static partition, committerOutputPath is the location path > of the table. When multiple tasks write to the same table concurrently, the > _temporary path will be deleted after one task ends, causing another task to > fail. > > test code: > > {code:java} > // code placeholder > object HiveTests { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[*]") > .appName("HiveTests") > .enableHiveSupport() > .getOrCreate() > //rows > val users1 = new util.ArrayList[Row]() > users1.add(Row(1, "user1", "2021-11-03", 10)) > users1.add(Row(2, "user2", "2021-11-03", 10)) > users1.add(Row(3, "user3", "2021-11-03", 10)) > //schema > val structType = StructType(Array( > StructField("id", IntegerType, true), > StructField("name", StringType, true), > StructField("dt", StringType, true), > StructField("hour", IntegerType, true) > )) > spark.sql("set hive.exec.dynamic.partition=true") > spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") > spark.sql("drop table if exists default.test") > spark.sql( > """ > |create table if not exists default.test ( > | id int, > | name string) > |partitioned by (dt string, hour int) > |stored as parquet > |""".stripMargin) > spark.sql("desc formatted default.test").show() > spark.sqlContext > .createDataFrame(users1, structType) > .select("id", "name") > .createOrReplaceTempView("user1") > val thread1 = new Thread(() => { > // spark.sql("INSERT INTO TABLE test PARTITION(dt = '2021-11-03', > hour=10) select * from user1") > spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-03', > hour=10) select * from user1") > }) > thread1.start() > val thread2 = new Thread(() => { > // spark.sql("INSERT INTO TABLE test PARTITION(dt = '2021-11-04', > hour=10) select * from user1") > spark.sql("INSERT OVERWRITE TABLE test PARTITION(dt = '2021-11-04', > hour=10) select * from user1") > }) > thread2.start() > thread1.join() > thread2.join() > spark.sql("select * from test").show() > spark.stop() > } > } {code} > > error message: > > {code:java} > // code placeholder > 21/11/04 19:01:21 ERROR Utils: Aborting task > ExitCodeException exitCode=1: chmod: cannot access > '/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_0001_m_000001_ > 4/dt=2021-11-03/hour=10/.part-00001-95895b03-45d2-4ac6-806b-b76fd1dfa3dc.c000.snappy.parquet.crc': > No such file or directory at > org.apache.hadoop.util.Shell.runCommand(Shell.java:1008) > at org.apache.hadoop.util.Shell.run(Shell.java:901) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213) > at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307) > at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289) > at > org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978) > at > org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324) > at > org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294) > at > org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439) > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428) > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459) > at > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:437) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) > at > org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) > at > org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150) > at > org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:290) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:357) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 21/11/04 19:01:21 WARN FileOutputCommitter: Could not delete > file:/data/spark-examples/spark-warehouse/test/_temporary/0/_temporary/attempt_202111041901182933014038999149736_ > 0001_m_000001_4 > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org