[ 
https://issues.apache.org/jira/browse/BEAM-2095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006299#comment-16006299
 ] 

Stas Levin edited comment on BEAM-2095 at 5/11/17 12:04 PM:
------------------------------------------------------------

Hi Arvid,

Sorry I did not provide any context here.

I was hoping you'd be willing to check your issue against the {{master}} 
branch, which is where this fix is merged to at the moment. 
Regarding your concern, I'm not sure if there were any API breaking changes 
between {{0.6}} and {{master}}.

The motivation here is that if this fix does indeed resolve the problem you 
reported, we can make an effort to get it in for version {{2.0.0}} (a.k.a, 
stable release) which is being worked on as we speak.

Also, just to make sure, {{Beam 2.0.0}} does not imply {{spark-runner}} is 
migrating to {{Spark 2.0.0}}, for now it keeps building upon {{Spark 1.6.3}}.


was (Author: staslev):
Hi Arvid,

Sorry I did not provide any context here.

I was hoping you'd be willing to check your issue against the {{master}} 
branch, which is where this fix is merged to at the moment. 
Regarding your concern, I'm not sure if there were any API breaking changes 
between {{0.6}} and {{master}}.

The motivation here is that if this fix does indeed resolve the problem you 
reported, we can make an effort to get it in for version {{2.0.0}} (a.k.a, 
stable release) which is being worked on as we speak.

Also, just to make sure, Beam {{2.0.0}} does not imply {{spark-runner}} is 
migrating to {{Spark 2.0.0}}, for now it keeps building upon {{Spark 1.6.3}}.

> The hasNext method of the iterator returned by SourceRDD#compute is not 
> idempotent
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-2095
>                 URL: https://issues.apache.org/jira/browse/BEAM-2095
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 0.6.0
>            Reporter: Arvid Heise
>            Assignee: Stas Levin
>             Fix For: 2.1.0
>
>
> When reading an Avro from HDFS with the new HDFSFileSource, we experience the 
> following exceptions:
> {code}
> 17/04/27 11:48:38 ERROR executor.Executor: Exception in task 2.0 in stage 1.0 
> (TID 32)
> java.util.NoSuchElementException
>       at 
> com.gfk.hyperlane.engine.target_group_evaluation.dataset.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:498)
>       at 
> org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
>       at 
> org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at 
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
>       at 
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:105)
>       at 
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:48)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> The error comes from a call to BoundedReader#getCurrent after it has been 
> closed.
> We logged the following call patterns:
> (for data)
>   advance
>   getCurrent
> (when drained)
> advance
>   close
> getCurrent
> The issue probably comes from the implementation in SourceRDD 
> https://github.com/apache/beam/blob/3101e69c438d5c42577fc7d3476d623f6e551837/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L145
> A repeated call to hasNext will result in repeated calls of advance. This 
> results in a data loss and may return different results. In particular, it 
> may cause the issue as observed.
> The usual solution is to use hasNext() to already retrieve and cache the next 
> element if cache empty and return and reset the cache in next().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to