Hi Andra,

I found the cause for the exception. Your test case is simply too complex
for our testing environment.
We restrict the TM memory for testcases to 80MB in order to execute
multiple tests in parallel on Travis.
I counted the memory consumers in your job and got:

- 2 Combine
- 4 GroupReduce
- 4 CoGroup
- 2 Joins
- 1 SolutionSet

Those are quite a few memory consumers for 20MB per slot (4 slots per TM).

Do you see a way to reduce the number of operators in your testcase, maybe
by splitting it in half?

2015-03-30 11:01 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:

> Sure,
>
> It was in the first mail but that was sent a while ago :)
>
> This is the code:
> https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> I also added the log4j file in case it helps!
>
> The error is totally reproducible. 2 out of 2 people got the same.
> Steps to reproduce:
> 1). Clone the code; switch to alphaSplit branch
> 2). Run CounDegreeITCase.java
>
> Hope we can get to the bottom of this! If you need something, just ask.
>
>
> On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hmm, that is really weird.
> > Can you point me to a branch in your repository and the test case that
> > gives the error?
> >
> > Then I have a look at it and try to figure out what's going wrong.
> >
> > Cheers, Fabian
> >
> > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> >
> > > Hello,
> > >
> > > I went on and did some further debugging on this issue. Even though the
> > > exception said that the problem comes from here:
> > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> > > org.apache.flink.runtime.operators.RegularPactTask  - Error in task
> code:
> > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > java.lang.Exception: The data preparation for task 'Join(Join at
> > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
> > > segments provided. Hash Join needs at least 33 memory segments.
> > >     at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > >     at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >     at
> > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > > which is basically a chain of two joins, schema that I have repeated
> > > several times, including in the getTriplets() method and it passed
> every
> > > time. I thought that this could not be right!
> > >
> > > So I picked each intermediate data set formed, printed it and added a
> > > System.exit(0) afterwards. The exception comes from this method:
> > > aggregatePartialValuesSplitVertices. Even though this computes the
> > correct
> > > result, it then throws the memory segment exception(!!!!!! Just for the
> > > Cluster test - everything else works).
> > >
> > > The code in the function is:
> > >
> > > private static DataSet<Vertex<String, Long>>
> > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > > resultedVertices) {
> > >
> > >    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> > > Long>, Vertex<String, Long>>() {
> > >
> > >       @Override
> > >       public void flatMap(Vertex<String, Long> vertex,
> > > Collector<Vertex<String, Long>> collector) throws Exception {
> > >          int pos = vertex.getId().indexOf("_");
> > >
> > >          // if there is a splitted vertex
> > >          if(pos > -1) {
> > >             collector.collect(new Vertex<String,
> > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > >          } else {
> > >             collector.collect(vertex);
> > >          }
> > >       }
> > >    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> > > Long>, Vertex<String, Long>>() {
> > >
> > >       @Override
> > >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> > >                      Collector<Vertex<String, Long>> collector) throws
> > > Exception {
> > >          long sum = 0;
> > >          Vertex<String, Long> vertex = new Vertex<String, Long>();
> > >
> > >          Iterator<Vertex<String, Long>> iterator = iterable.iterator();
> > >          while (iterator.hasNext()) {
> > >             vertex = iterator.next();
> > >             sum += vertex.getValue();
> > >          }
> > >
> > >          collector.collect(new Vertex<String, Long>(vertex.getId(),
> > sum));
> > >       }
> > >    });
> > >
> > > To me, nothing seems out of the ordinary here. This is regular user
> code.
> > > And the behaviour in the end is definitely not the one expected. Any
> idea
> > > why this might be happening?
> > >
> > > Thanks!
> > > Andra
> > >
> > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lungu.an...@gmail.com>
> > > wrote:
> > >
> > > > Opps! Sorry! Did not know the mailing list does not support
> attachments
> > > :)
> > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > >
> > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lungu.an...@gmail.com
> >
> > > > wrote:
> > > >
> > > >> Hi Fabian,
> > > >>
> > > >> I uploaded a file with my execution plan.
> > > >>
> > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fhue...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Andra,
> > > >>>
> > > >>> the error is independent of the size of the data set. A HashTable
> > needs
> > > >>> at
> > > >>> least 33 memory pages to operate.
> > > >>> Since you have 820MB of managed memory and the size of a memory
> page
> > is
> > > >>> 32KB, there should be more than 25k pages available.
> > > >>>
> > > >>> Can you post the execution plan of the program you execute (
> > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > >>>
> > > >>> Best, Fabian
> > > >>>
> > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>:
> > > >>>
> > > >>> > For 20 edges and 5 nodes, that should be more thank enough.
> > > >>> >
> > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > lungu.an...@gmail.com
> > > >
> > > >>> > wrote:
> > > >>> >
> > > >>> > > Sure,
> > > >>> > >
> > > >>> > > 3470 [main] INFO
> > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > >>> > > Using 820 MB for Flink managed memory.
> > > >>> > >
> > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > > rmetz...@apache.org
> > > >>> >
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > >> Hi,
> > > >>> > >>
> > > >>> > >> during startup, Flink will log something like:
> > > >>> > >> 16:48:09,669 INFO
> > > org.apache.flink.runtime.taskmanager.TaskManager
> > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > >>> > >>
> > > >>> > >> Can you tell us how much memory Flink is managing in your
> case?
> > > >>> > >>
> > > >>> > >>
> > > >>> > >>
> > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > > lungu.an...@gmail.com
> > > >>> >
> > > >>> > >> wrote:
> > > >>> > >>
> > > >>> > >> > Hello everyone,
> > > >>> > >> >
> > > >>> > >> > I guess I need to revive this old discussion:
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > >>> > >> >
> > > >>> > >> > At that point, the fix was to kindly ask Alex to make his
> > > project
> > > >>> work
> > > >>> > >> with
> > > >>> > >> > 0.9.
> > > >>> > >> >
> > > >>> > >> > Now, I am not that lucky!
> > > >>> > >> >
> > > >>> > >> > This is the code:
> > > >>> > >> >
> > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > >>> > >> >
> > > >>> > >> > The main program(NodeSplitting) is working nicely, I get the
> > > >>> correct
> > > >>> > >> > result. But if you run the test,  you will see that
> collection
> > > >>> works
> > > >>> > and
> > > >>> > >> > cluster fails miserably with this exception:
> > > >>> > >> >
> > > >>> > >> > Caused by: java.lang.Exception: The data preparation for
> task
> > > >>> > >> 'Join(Join at
> > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > >>> > >> ,
> > > >>> > >> > caused an error: Too few memory segments provided. Hash Join
> > > >>> needs at
> > > >>> > >> least
> > > >>> > >> > 33 memory segments.
> > > >>> > >> >     at
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > >>> > >> >     at
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >>> > >> >     at
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few
> memory
> > > >>> segments
> > > >>> > >> > provided. Hash Join needs at least 33 memory segments.
> > > >>> > >> >
> > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > > >>> > >> > $ cat /proc/meminfo
> > > >>> > >> > MemTotal:       11405696 kB
> > > >>> > >> > MemFree:         5586012 kB
> > > >>> > >> > Buffers:          178100 kB
> > > >>> > >> >
> > > >>> > >> > I am sure I did not run out of memory...
> > > >>> > >> >
> > > >>> > >> > Any thoughts on this?
> > > >>> > >> >
> > > >>> > >> > Thanks!
> > > >>> > >> > Andra
> > > >>> > >> >
> > > >>> > >>
> > > >>> > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to