Hi. I was trying to run the spark-itemsimilarity job on a dataset of mine,
and am running into java heap space OOM errors (stack trace below). I
noticed that the job fails on a reduce step that refers to
SparkEngine.scala:79, which reduces on a += operation. Presumably this is
building a DenseVector to contain all the similarity scores against all
items for each item. Is the list meant to be stored in memory? If so, it
would seem that the job will fail if we have a number of items in the tens
of thousands (my guess). Is this a correct assumption? I'm including my
stack trace:
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at java.io.DataOutputStream.writeDouble(DataOutputStream.java:259)
at
org.apache.mahout.math.VectorWritable.writeVectorContents(VectorWritable.java:213)
at
org.apache.mahout.math.MatrixWritable.writeMatrix(MatrixWritable.java:194)
at org.apache.mahout.math.MatrixWritable.write(MatrixWritable.java:56)
at
org.apache.mahout.sparkbindings.io.WritableKryoSerializer.write(WritableKryoSerializer.scala:29)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
Thanks,
Andy