Hi,
I played a bit with the ALS recommender algorithm. I used the movielens
dataset: http://files.grouplens.org/datasets/movielens/ml-latest-README.html
The rating matrix has 21.063.128 entries (ratings).
I run the algorithm with 3 configurations:
1. standard jvm heap space:
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(100)
throws:
java.lang.RuntimeException: Hash Join bug in memory management: Memory
buffers leaked.
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
2. 5G jvm heap space
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
throws:
java.lang.NullPointerException
at
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
3. 14G jvm heap space
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
-> works
Is this a Flink problem or is it just my bad configuration?
Best regards,
Felix