I think it is not a problem of join hints, but rather of too little memory for the join operator. If you set the temporary directory, then the job will be split in smaller parts and thus each operator gets more memory. Alternatively, you can increase the memory you give to the Task Managers.
The problem with the NullPointerException won't be solved by this, though. Could you send the full stack trace for that? Cheers, Till On Jun 4, 2015 7:10 PM, "Andra Lungu" <[email protected]> wrote: > Hi Felix, > > Passing a JoinHint to your function should help. > see: > > http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E > > Cheers, > Andra > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[email protected]> > wrote: > > > after bug fix: > > > > for 100 blocks and standard jvm heap space > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum number > of > > recursions, without reducing partitions enough to be memory resident. > > Probably cause: Too many duplicate keys. > > at > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718) > > at > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506) > > at > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543) > > at > > > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > at > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > > > > > for 150 blocks and 5G jvm heap space > > > > Caused by: java.lang.NullPointerException > > at > > > > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > > ... > > > > Best regards, > > Felix > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[email protected]>: > > > > > Yes, I will try it again with the newest update :) > > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[email protected]>: > > > > > >> If the first error is not fixed by Chiwans PR, then we should create a > > >> JIRA > > >> for it to not forget it. > > >> > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with > > this > > >> version? > > >> > > >> Cheers, > > >> Till > > >> > > >> [1] https://github.com/apache/flink/pull/751 > > >> > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[email protected]> > > >> wrote: > > >> > > >> > Hi. The second bug is fixed by the recent change in PR. > > >> > But there is just no test case for first bug. > > >> > > > >> > Regards, > > >> > Chiwan Park > > >> > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[email protected]> wrote: > > >> > > > > >> > > I think both are bugs. They are triggered by the different memory > > >> > > configurations. > > >> > > > > >> > > @chiwan: is the 2nd error fixed by your recent change? > > >> > > > > >> > > @felix: if yes, can you try the 2nd run again with the changes? > > >> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <[email protected]> > > >> wrote: > > >> > > > > >> > >> Hi, > > >> > >> > > >> > >> I played a bit with the ALS recommender algorithm. I used the > > >> movielens > > >> > >> dataset: > > >> > >> > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html > > >> > >> > > >> > >> The rating matrix has 21.063.128 entries (ratings). > > >> > >> > > >> > >> I run the algorithm with 3 configurations: > > >> > >> > > >> > >> 1. standard jvm heap space: > > >> > >> > > >> > >> val als = ALS() > > >> > >> .setIterations(10) > > >> > >> .setNumFactors(10) > > >> > >> .setBlocks(100) > > >> > >> > > >> > >> throws: > > >> > >> java.lang.RuntimeException: Hash Join bug in memory management: > > >> Memory > > >> > >> buffers leaked. > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > >> > >> at > > >> > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > >> > >> at java.lang.Thread.run(Thread.java:745) > > >> > >> > > >> > >> 2. 5G jvm heap space > > >> > >> > > >> > >> val als = ALS() > > >> > >> .setIterations(10) > > >> > >> .setNumFactors(10) > > >> > >> .setBlocks(150) > > >> > >> > > >> > >> throws: > > >> > >> > > >> > >> java.lang.NullPointerException > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > >> > >> at > > >> > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > >> > >> at java.lang.Thread.run(Thread.java:745) > > >> > >> > > >> > >> 3. 14G jvm heap space > > >> > >> > > >> > >> val als = ALS() > > >> > >> .setIterations(10) > > >> > >> .setNumFactors(10) > > >> > >> .setBlocks(150) > > >> > >> .setTemporaryPath("/tmp/tmpALS") > > >> > >> > > >> > >> -> works > > >> > >> > > >> > >> Is this a Flink problem or is it just my bad configuration? > > >> > >> > > >> > >> Best regards, > > >> > >> Felix > > >> > >> > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > > > > > >
