[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327649#comment-16327649
 ] 

Steve Loughran commented on SPARK-23050:
----------------------------------------

{quote} Is there an API to detect S3 like file systems?
{quote}
Not really, no, because they are all so fairly different.. HADOOP-9565 
documents different attempts in the past do to a cloud-store specific API with 
both capabilities and operations (PUT, LIST, ...). Problem there is that to do 
the interesting stuff you need to move off lowest-common-denominator, anything 
with some "Blobstore" interface to look for is limited and potentially too 
pessemistic (WASB and ADLS stronger consistency, etc)

 

I'd expect in future to at least declare FS Semantics more, the way Hadoop IO 
stream's now have a {{StreamCapabilities.hasCapabilities(string)}} probe. 
HADOOP-14707 formalises exactly that for filesystems; and S3A in trunk 
implements {{StreamCapabilities}} already.

For now, the one thing which will work everywhere is to have some option for 
checkpointing in place rather than write+rename

w.r.t how best to handle this FNFE here, a busy-wait for a few seconds *should* 
fix s3 inconsistency, but doesn't handle the non-failure-condition of "no file 
written because there was no interesting data", which ORC can, I believe, 
raise. [~dongjoon] will be the expert there

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-23050
>                 URL: https://issues.apache.org/jira/browse/SPARK-23050
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yash Sharma
>            Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
>     .format("parquet") \
>     .option("compression", "snappy") \
>     .option("path", "s3://path/data/") \
>     .option("checkpointLocation", "s3://path/checkpoint/") \
>     .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00      17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10      17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:========================>                            (277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>       at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>       at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>       at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>       ... 8 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to