[ 
https://issues.apache.org/jira/browse/SPARK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712358#comment-15712358
 ] 

Sean Owen commented on SPARK-18650:
-----------------------------------

I guess I'm surprised if the partitions are evaluated by multiple threads but 
it's possible I'm just not seeing how that's possible. Some synchronization 
might, at the least, not hurt.

> race condition in FileScanRDD.scala
> -----------------------------------
>
>                 Key: SPARK-18650
>                 URL: https://issues.apache.org/jira/browse/SPARK-18650
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>         Environment: scala 2.11
> macos 10.11.6
>            Reporter: Jay Goldman
>
> I am attempting to create a DataSet from a single CSV file :
>  val ss: SparkSession = ....
>  val ddr = ss.read.option("path", path)
> ... (choose between xml vs csv parsing)
>  var df = ddr.option("sep", ",")
>           .option("quote", "\"")
>           .option("escape", "\"") // want to retain backslashes (\) ...
>           .option("delimiter", ",")
>           .option("comment", "#")
>           .option("header", "true")
>           .option("format", "csv")
>            ddr.csv(path)
> df.count() returns 2 times the number of lines in the CSV file - i.e., each 
> line of the input file shows up as 2 rows in df. 
> moreover df.distinct.count has the correct rows.
> There appears to be a problem in FileScanRDD.compute. I am using spark 
> version 2.0.1 with scala 2.11. I am not going to include the entire contents 
> of FileScanRDD.scala here.
> In FileScanRDD.compute there is the following:
>  private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
> If i put a breakpoint in either FileScanRDD.compute or 
> FIleScanRDD.nextIterator the resulting dataset has the correct number of rows.
> Moreover, the code in FileScanRDD.scala is:
> private def nextIterator(): Boolean = {
>         updateBytesReadWithFileSize()
>         if (files.hasNext) { // breakpoint here => works
>           currentFile = files.next() // breakpoint here => fails
>           ....
>         }
>         else { .... }
> ....
> }
> if i put a breakpoint on the files.hasNext line all is well; however, if i 
> put a breakpoint on the files.next() line the code will fail when i continue 
> because the files iterator has become empty (see stack trace below). 
> Disabling the breakpoint winds up creating a Dataset with each line of the 
> csv file duplicated.
> So it appears that multiple threads are using the files iterator or the 
> underling split value (an RDDPartition) and timing wise on my system 2 
> workers wind up processing the same file, with the resulting DataSet having 2 
> copies of each of the input lines.
> This code is not active when parsing an XML file. 
> here is stack trace:
> java.util.NoSuchElementException: next on empty iterator
>       at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>       at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>       at 
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: 
> Lost task 0.0 in stage 0.0 (TID 0, localhost): 
> java.util.NoSuchElementException: next on empty iterator
>       at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>       at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>       at 
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
>       at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>       at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>       at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
>       at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
>       at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>       at 
> edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:275)
>       at 
> edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:232)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>       at edu.mit.ll.bb.ingest.LogAsMaps$.makeDataset(LogAsMaps.scala:232)
>       at edu.mit.ll.bb.ingest.LogAsMaps$.apply(LogAsMaps.scala:179)
>       at edu.mit.ll.bb.ingest.LogAsMaps$.makeLog(LogAsMaps.scala:396)
>       at 
> edu.mit.ll.bb.app.BrowseApp$.delayedEndpoint$edu$mit$ll$bb$app$BrowseApp$1(BrowseApp.scala:103)
>       at 
> edu.mit.ll.bb.app.BrowseApp$delayedInit$body.apply(BrowseApp.scala:18)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at edu.mit.ll.bb.app.SingleEventFileApp.main(SingleEventFileApp.scala:6)
>       at edu.mit.ll.bb.app.BrowseApp.main(BrowseApp.scala)
> Caused by: java.util.NoSuchElementException: next on empty iterator
>       at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>       at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>       at 
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Disconnected from the target VM, address: '127.0.0.1:64555', transport: 
> 'socket'



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to