Issue:
I have a random forest model that am trying to load during streaming using 
following code.  The code is working fine when I am running the code from 
Eclipse but getting NPE when running the code using spark-submit.

JavaStreamingContext jssc = new JavaStreamingContext(jsc, 
Durations.seconds(duration));
System.out.println("&&&&&&&&&&&&&&&&&&&&& trying to get the context 
&&&&&&&&&&&&&&&&&&& " );
final RandomForestModel model = 
RandomForestModel.load(jssc.sparkContext().sc(), MODEL_DIRECTORY);//line 116 
causing the issue.
System.out.println("&&&&&&&&&&&&&&&&&&&&& model debug &&&&&&&&&&&&&&&&&&&&&&& " 
+ model.toDebugString());


Exception Details:
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 2.0, 
whose tasks have all completed, from pool
Exception in thread "main" java.lang.NullPointerException
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$SplitData.toSplit(DecisionTreeModel.scala:144)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$16.apply(DecisionTreeModel.scala:291)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$16.apply(DecisionTreeModel.scala:291)
                        at scala.Option.map(Option.scala:145)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:291)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:286)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:287)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:286)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructTree(DecisionTreeModel.scala:268)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$12.apply(DecisionTreeModel.scala:251)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$12.apply(DecisionTreeModel.scala:250)
                        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
                        at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
                        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
                        at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
                        at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructTrees(DecisionTreeModel.scala:250)
                        at 
org.apache.spark.mllib.tree.model.TreeEnsembleModel$SaveLoadV1_0$.loadTrees(treeEnsembleModels.scala:340)
                        at 
org.apache.spark.mllib.tree.model.RandomForestModel$.load(treeEnsembleModels.scala:72)
                        at 
org.apache.spark.mllib.tree.model.RandomForestModel.load(treeEnsembleModels.scala)
                        at 
com.markmonitor.antifraud.ce.KafkaURLStreaming.main(KafkaURLStreaming.java:116)
                        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
                        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
                        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                        at java.lang.reflect.Method.invoke(Method.java:606)
                        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
                        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
                        at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
                        at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
                        at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Nov 19, 2015 1:10:56 PM WARNING: parquet.hadoop.ParquetRecordReader: Can not 
initialize counter due to context is not a instance of TaskInputOutputContext, 
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

Spark Source Code:
case class PredictData(predict: Double, prob: Double) {
      def toPredict: Predict = new Predict(predict, prob)
}

Thanks,

Rachana


Reply via email to