Is it possible to write a short test which exhibits this problem ?

For Spark 2.0, this part of code has changed:

[SPARK-4819] Remove Guava's "Optional" from public API

FYI

On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <aswes...@gmail.com> wrote:

> Hi,
>
> I’m attempting to do a left outer join in Spark, and I’m getting an NPE
> that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
> in local mode on a Mac).
>
> For a little background, the left outer join returns all keys from the
> left side of the join regardless of whether or not the key is present on
> the right side.  To handle this uncertainty, the value from the right side
> is wrapped in Guava’s Optional class.  The Optional class has a method to
> check whether the value is present or not (which would indicate the key
> appeared in both RDDs being joined).  If the key was indeed present in both
> RDDs you can then retrieve the value and move forward.
>
> After doing a little digging, I found that Spark is using Scala’s Option
> functionality internally.  This is the same concept as the Guava Optional,
> only native to Scala.  It appears that during the conversion from a Scala
> Option back to a Guava Optional (this method can be found here:
> https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28)
>  the
> conversion method is erroneously passed a Scala Option with the String
> value “None” instead of Scala’s null value None.  This is matched to the
> first *case*, which causes Guava’s Optional.of method to attempt to pull
> the value out.  A NPE is thrown since it wasn’t ever actually there.
>
> The code basically looks like this, where the classes used are just plain
> Java objects with some class attributes inside:
> // First RDD
> JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
> // Second RDD
> JavaPairRDD<GroupItemNode, Inventory> rdd2
>
> // Resultant RDD
> JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, Optional<Inventory>>>
> result = rdd1.leftOuterJoin(rdd2)
>
> Has anyone ever encountered this problem before, or know why the
> optionToOptional method might be getting passed this “None” value?  I’ve
> added some more relevant information below, let me know if I can provide
> any more details.
>
> Here's a screenshot showing the string value of “None” being passed into
> the optionToOptional method using the debugger:
>
> Here’s the stack trace (the method shown above is highlighted):
>
> ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
> Exception while running need engine:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
> in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
> 31.0 (TID 50, localhost): java.lang.NullPointerException
> at
> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
> at com.google.common.base.Optional.of(Optional.java:86)
> at org.apache.spark.api.java.JavaUtils$.optionToOptional
> (JavaUtils.scala:30)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> <http://org.apache.spark.scheduler.dagscheduler.org/>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
> at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
> at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
> at
> com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
> at
> com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
> at
> com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
> at
> com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.NullPointerException
> at
> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
> at com.google.common.base.Optional.of(Optional.java:86)
> at org.apache.spark.api.java.JavaUtils$.optionToOptional
> (JavaUtils.scala:30)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>  WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
> 31.0 (TID 51, localhost): TaskKilled (killed intentionally)
>
> Thank you for any help you may be able to provide,
>
> Adam Westerman
>

Reply via email to