Daehee Han created SPARK-34372:
----------------------------------

             Summary: Speculation results in broken CSV files in Amazon S3
                 Key: SPARK-34372
                 URL: https://issues.apache.org/jira/browse/SPARK-34372
             Project: Spark
          Issue Type: Bug
          Components: Input/Output
    Affects Versions: 2.4.7
         Environment: Amazon EMR with AMI version 5.32.0
            Reporter: Daehee Han


Hi, we've been experiencing some rows get corrupted while partitioned CSV files 
were written to Amazon S3. Some records were found broken without any error on 
Spark. Digging into the root cause, we found out Spark speculation tried to 
upload a partition being uploaded slowly and ended up uploading only a part of 
the partition, letting broken data uploaded to S3.

Here're stacktraces we've found. There are two executor involved - A: the first 
executor which tried to upload the file, but it took much longer than other 
executor (but still succeeded), which made spark speculation cut in and kick 
off another executor B. Executor B started to upload the file too, but was 
interrupted during uploading (killed: another attempt succeeded), and ended up 
uploading only a part of the whole file.

 

Executor A:
{quote}21/01/28 17:22:21 INFO Executor: Running task 426.0 in stage 45.0 (TID 
13201) 
21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty 
blocks including 10 local blocks and 460 remote blocks 
21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Started 46 remote fetches 
in 18 ms 
21/01/28 17:22:21 INFO FileOutputCommitter: File Output Committer Algorithm 
version is 2 
21/01/28 17:22:21 INFO FileOutputCommitter: FileOutputCommitter skip cleanup 
_temporary folders under output directory:false, ignore cleanup failures: true 
21/01/28 17:22:21 INFO DirectFileOutputCommitter: Direct Write: ENABLED 
21/01/28 17:22:21 INFO SQLConfCommitterProvider: Using output committer class
21/01/28 17:22:21 INFO  INFO CSEMultipartUploadOutputStream: close closed:false 
s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv
21/01/28 17:22:31 INFO DefaultMultipartUploadDispatcher: Completed multipart 
upload of 1 parts 8461990 bytes 
21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: Finished uploading 
\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv. Elapsed 
seconds: 10. 
21/01/28 17:22:31 INFO SparkHadoopMapRedUtil: No need to commit output of task 
because needsTaskCommit=false: attempt_20210128172219_0045_m_000426_13201 
21/01/28 17:22:31 INFO Executor: Finished task 426.0 in stage 45.0 (TID 13201). 
8782 bytes result sent to driver{quote}
Executor B:
{quote}21/01/28 17:22:31 INFO CoarseGrainedExecutorBackend: Got assigned task 
13245 21/01/28 17:22:31 INFO Executor: Running task 426.1 in stage 45.0 (TID 
13245) 21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Getting 470 
non-empty blocks including 11 local blocks and 459 remote blocks 
21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Started 46 remote fetches 
in 2 ms 
21/01/28 17:22:31 INFO FileOutputCommitter: File Output Committer Algorithm 
version is 2 
21/01/28 17:22:31 INFO FileOutputCommitter: FileOutputCommitter skip cleanup 
_temporary folders under output directory:false, ignore cleanup failures: true 
21/01/28 17:22:31 INFO DirectFileOutputCommitter: Direct Write: ENABLED 
21/01/28 17:22:31 INFO SQLConfCommitterProvider: Using output committer class 
org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter 
21/01/28 17:22:31 INFO Executor: Executor is trying to kill task 426.1 in stage 
45.0 (TID 13245), reason: another attempt succeeded 
21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: close closed:false 
s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv 
21/01/28 17:22:32 INFO DefaultMultipartUploadDispatcher: Completed multipart 
upload of 1 parts 3145728 bytes 
21/01/28 17:22:32 INFO CSEMultipartUploadOutputStream: Finished uploading 
\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv. Elapsed 
seconds: 0. 
21/01/28 17:22:32 ERROR Utils: Aborting task 
com.univocity.parsers.common.TextWritingException: Error writing row. Internal 
state when error was thrown: recordCount=18449, recordData=[\{obfuscated}] at 
com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:935)
 at 
com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:714) 
at 
org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:84)
 at 
org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:181)
 at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:123) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
com.univocity.parsers.common.TextWritingException: Error writing row. Internal 
state when error was thrown: recordCount=18449, recordCharacters=\{obfuscated} 
at 
com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:920)
 at 
com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:829)
 at 
com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:712) 
... 17 more Caused by: java.io.InterruptedIOException at 
java.io.PipedInputStream.awaitSpace(PipedInputStream.java:275) at 
java.io.PipedInputStream.receive(PipedInputStream.java:231) at 
java.io.PipedOutputStream.write(PipedOutputStream.java:149) at 
com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream.write(CSEMultipartUploadOutputStream.java:242)
 at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
 at java.io.DataOutputStream.write(DataOutputStream.java:107) at 
sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) at 
sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at 
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) at 
java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) at 
com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:153)
 at 
com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:826)
 ... 18 more 
21/01/28 17:22:32 INFO DirectFileOutputCommitter: Nothing to clean up on abort 
since there are no temporary files written 
21/01/28 17:22:32 ERROR FileFormatWriter: Job job_20210128172219_0045 aborted. 
21/01/28 17:22:32 INFO Executor: Executor interrupted and killed task 426.1 in 
stage 45.0 (TID 13245), reason: another attempt succeeded{quote}
We'll be bypassing this problem by setting speculation off, however ideally it 
seems that either speculation should be disabled in this stage (writing to S3).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to