[
https://issues.apache.org/jira/browse/SPARK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281815#comment-17281815
]
Attila Zsolt Piros commented on SPARK-34372:
--------------------------------------------
A Direct output committer with speculation could lead to this kind of problems
even to data loss.
Please check this out
https://issues.apache.org/jira/browse/SPARK-10063
Although DirectParquetOutputCommitter is removed you are using
DirectFileOutputCommitter.
There must be a warning in the logs:
[https://github.com/apache/spark/blob/18b30107adb37d3c7a767a20cc02813f0fdb86da/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1050-L1057]
> Speculation results in broken CSV files in Amazon S3
> ----------------------------------------------------
>
> Key: SPARK-34372
> URL: https://issues.apache.org/jira/browse/SPARK-34372
> Project: Spark
> Issue Type: Bug
> Components: Input/Output
> Affects Versions: 2.4.7
> Environment: Amazon EMR with AMI version 5.32.0
> Reporter: Daehee Han
> Priority: Minor
> Labels: csv, s3, spark, speculation
>
> Hi, we've been experiencing some rows get corrupted while partitioned CSV
> files were written to Amazon S3. Some records were found broken without any
> error on Spark. Digging into the root cause, we found out Spark speculation
> tried to upload a partition being uploaded slowly and ended up uploading only
> a part of the partition, letting broken data uploaded to S3.
> Here're stacktraces we've found. There are two executor involved - A: the
> first executor which tried to upload the file, but it took much longer than
> other executor (but still succeeded), which made spark speculation cut in and
> kick off another executor B. Executor B started to upload the file too, but
> was interrupted during uploading (killed: another attempt succeeded), and
> ended up uploading only a part of the whole file. You can see in the log, the
> file executor A uploaded (8461990 bytes originally) was overwritten by
> executor B (uploaded only 3145728 bytes).
>
> Executor A:
> {quote}21/01/28 17:22:21 INFO Executor: Running task 426.0 in stage 45.0 (TID
> 13201)
> 21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty
> blocks including 10 local blocks and 460 remote blocks
> 21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Started 46 remote
> fetches in 18 ms
> 21/01/28 17:22:21 INFO FileOutputCommitter: File Output Committer Algorithm
> version is 2
> 21/01/28 17:22:21 INFO FileOutputCommitter: FileOutputCommitter skip cleanup
> _temporary folders under output directory:false, ignore cleanup failures:
> true
> 21/01/28 17:22:21 INFO DirectFileOutputCommitter: Direct Write: ENABLED
> 21/01/28 17:22:21 INFO SQLConfCommitterProvider: Using output committer class
> 21/01/28 17:22:21 INFO INFO CSEMultipartUploadOutputStream: close
> closed:false
> s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv
> 21/01/28 17:22:31 INFO DefaultMultipartUploadDispatcher: Completed multipart
> upload of 1 parts 8461990 bytes
> 21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: Finished uploading
> \{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv.
> Elapsed seconds: 10.
> 21/01/28 17:22:31 INFO SparkHadoopMapRedUtil: No need to commit output of
> task because needsTaskCommit=false:
> attempt_20210128172219_0045_m_000426_13201
> 21/01/28 17:22:31 INFO Executor: Finished task 426.0 in stage 45.0 (TID
> 13201). 8782 bytes result sent to driver
> {quote}
> Executor B:
> {quote}21/01/28 17:22:31 INFO CoarseGrainedExecutorBackend: Got assigned task
> 13245 21/01/28 17:22:31 INFO Executor: Running task 426.1 in stage 45.0 (TID
> 13245)
> 21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty
> blocks including 11 local blocks and 459 remote blocks
> 21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Started 46 remote
> fetches in 2 ms
> 21/01/28 17:22:31 INFO FileOutputCommitter: File Output Committer Algorithm
> version is 2
> 21/01/28 17:22:31 INFO FileOutputCommitter: FileOutputCommitter skip cleanup
> _temporary folders under output directory:false, ignore cleanup failures:
> true
> 21/01/28 17:22:31 INFO DirectFileOutputCommitter: Direct Write: ENABLED
> 21/01/28 17:22:31 INFO SQLConfCommitterProvider: Using output committer
> class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
> 21/01/28 17:22:31 INFO Executor: Executor is trying to kill task 426.1 in
> stage 45.0 (TID 13245), reason: another attempt succeeded
> 21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: close closed:false
> s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv
> 21/01/28 17:22:32 INFO DefaultMultipartUploadDispatcher: Completed multipart
> upload of 1 parts 3145728 bytes
> 21/01/28 17:22:32 INFO CSEMultipartUploadOutputStream: Finished uploading
> \{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv.
> Elapsed seconds: 0.
> 21/01/28 17:22:32 ERROR Utils: Aborting task
> com.univocity.parsers.common.TextWritingException: Error writing row.
> Internal state when error was thrown: recordCount=18449, recordData=[
> Unknown macro: \{obfuscated}
> ] at
> com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:935)
> at
> com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:714)
> at
> org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:84)
> at
> org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:181)
> at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:123) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> com.univocity.parsers.common.TextWritingException: Error writing row.
> Internal state when error was thrown: recordCount=18449,
> recordCharacters=\{obfuscated} at
> com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:920)
> at
> com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:829)
> at
> com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:712)
> ... 17 more Caused by: java.io.InterruptedIOException at
> java.io.PipedInputStream.awaitSpace(PipedInputStream.java:275) at
> java.io.PipedInputStream.receive(PipedInputStream.java:231) at
> java.io.PipedOutputStream.write(PipedOutputStream.java:149) at
> com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream.write(CSEMultipartUploadOutputStream.java:242)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
> at java.io.DataOutputStream.write(DataOutputStream.java:107) at
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) at
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) at
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) at
> com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:153)
> at
> com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:826)
> ... 18 more
> 21/01/28 17:22:32 INFO DirectFileOutputCommitter: Nothing to clean up on
> abort since there are no temporary files written
> 21/01/28 17:22:32 ERROR FileFormatWriter: Job job_20210128172219_0045
> aborted.
> 21/01/28 17:22:32 INFO Executor: Executor interrupted and killed task 426.1
> in stage 45.0 (TID 13245), reason: another attempt succeeded
> {quote}
> We'll be bypassing this problem by setting speculation off, however ideally
> it seems that either speculation should be disabled in this stage (writing to
> S3).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]