Arvid Heise created BEAM-2095:
---------------------------------

             Summary: SourceRDD hasNext not idempotent
                 Key: BEAM-2095
                 URL: https://issues.apache.org/jira/browse/BEAM-2095
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 0.6.0
            Reporter: Arvid Heise
            Assignee: Amit Sela


When reading an Avro from HDFS with the new HDFSFileSource, we experience the 
following exceptions:

{code}
17/04/27 11:48:38 ERROR executor.Executor: Exception in task 2.0 in stage 1.0 
(TID 32)
java.util.NoSuchElementException
        at 
com.gfk.hyperlane.engine.target_group_evaluation.dataset.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:498)
        at 
org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
        at 
org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
        at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
        at 
org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at 
org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
        at 
org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at 
org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
        at 
org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:105)
        at 
org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:48)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

The error comes from a call to BoundedReader#getCurrent after it has been 
closed.

We logged the following call patterns:
(for data)
  advance
  getCurrent
(when drained)
advance
  close
getCurrent

The issue probably comes from the implementation in SourceRDD 
https://github.com/apache/beam/blob/3101e69c438d5c42577fc7d3476d623f6e551837/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L145

A repeated call to hasNext will result in repeated calls of advance. This 
results in a data loss and may return different results. In particular, it may 
cause the issue as observed.

The usual solution is to use hasNext() to already retrieve and cache the next 
element if cache empty and return and reset the cache in next().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to