Looks like your MyCustomKeyType.equals() method doesn't correctly handle a null argument. In general, the contract of equals is to return false if called with a null argument, which this code currently relies on.
This could still be patched for the sake of backwards compatibility with similarly incomplete equals() methods which previously worked. On Mon, Jan 27, 2014 at 11:34 AM, Archit Thakur <archit279tha...@gmail.com>wrote: > ERROR executor.Executor: Exception in task ID 20 > java.lang.NullPointerException > at > > com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200) > at > org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122) > at > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42) > at > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103) > at > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102) > at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465) > at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) > at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown > Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.lang.Thread.run(Unknown Source) > > > > On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <r...@databricks.com> wrote: > > > Do you mind pasting the whole stack trace for the NPE? > > > > > > > > On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur < > archit279tha...@gmail.com > > >wrote: > > > > > Hi, > > > > > > Implementation of aggregation logic has been changed with 0.8.1 > > > (Aggregator.scala) > > > > > > It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0 > > > release. > > > > > > Aggregator.scala > > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : > > Iterator[(K, > > > C)] = { > > > val combiners = new AppendOnlyMap[K, C] > > > var kv: Product2[K, V] = null > > > val update = (hadValue: Boolean, oldValue: C) => { > > > if (hadValue) mergeValue(oldValue, kv._2) else > > createCombiner(kv._2) > > > } > > > while (iter.hasNext) { > > > kv = iter.next() > > > combiners.changeValue(kv._1, update) > > > } > > > combiners.iterator > > > } > > > > > > I am facing problem that in changeValue function of AppendOnlyMap, it > > > computes, > > > val curKey = data(2 * pos) > > > which is coming as null and eventually giving NPE. > > > > > > AppendOnlyMap.scala > > > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { > > > val k = key.asInstanceOf[AnyRef] > > > if (k.eq(null)) { > > > if (!haveNullValue) { > > > incrementSize() > > > } > > > nullValue = updateFunc(haveNullValue, nullValue) > > > haveNullValue = true > > > return nullValue > > > } > > > var pos = rehash(k.hashCode) & mask > > > var i = 1 > > > while (true) { > > > val curKey = data(2 * pos) > > > if (k.eq(curKey) || k.equals(curKey)) { > > > val newValue = updateFunc(true, data(2 * pos + > > 1).asInstanceOf[V]) > > > data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] > > > return newValue > > > } else if (curKey.eq(null)) { > > > val newValue = updateFunc(false, null.asInstanceOf[V]) > > > data(2 * pos) = k > > > data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] > > > incrementSize() > > > return newValue > > > } else { > > > val delta = i > > > pos = (pos + delta) & mask > > > i += 1 > > > } > > > } > > > null.asInstanceOf[V] // Never reached but needed to keep compiler > > happy > > > } > > > > > > > > > Other info: > > > 1. My code works fine with 0.8.0. > > > 2. I used groupByKey transformation. > > > 3. I replaces the Aggregator.scala with the older version(0.8.0), > > compiled > > > it, Restarted Master and Worker, It ran successfully. > > > > > > Thanks and Regards, > > > Archit Thakur. > > > > > >