Hi Till,

Thanks a lot for your help! I'll try to use another variable type in the
meantime.

Best regards,
Olga


Best regards,
Olga Golovneva

On Thu, Sep 15, 2016 at 1:03 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Olga,
>
> it’s indeed an error in Flink’s Summarization algorithm. The problem is the
> following: The vertex group value of the VertexGroupItem is null in the
> VertexGroupReducer. This works in the SummarizationIT case because the
> vertex value is of type String and the StringSerializer can deal with null
> values.
>
> However, in your case where you use longs, it fails, because the
> LongSerializer cannot handle null values. You can verify this behaviour by
> changing the vertex value type to String. Then everything should work
> without a problem.
>
> I’ve cc’ed Martin who can tell you probably more about the Summarization
> algorithm. I’ve also opened a JIRA ticket [1] to fix this problem.
>
> Thanks for reporting this bug.
>
> [1] https://issues.apache.org/jira/browse/FLINK-4624
>
> Cheers,
> Till
> ​
>
> On Thu, Sep 15, 2016 at 5:05 PM, Olga Golovneva <melcha...@gmail.com>
> wrote:
>
> > Hi Till,
> >
> > I've created a simple (Java) example to show you what's going on. The
> code
> > is in attachment and shown below. This example creates simple graph with
> > Double EV and Long VV. Then it runs Summarization, that should compute a
> > condensed version of the input graph by grouping vertices and edges based
> > on their values. I run this code with IntelliJ IDEA. The code executes
> fine
> > until you want to see what is written in resulted edges (just uncomment
> > line 46, edgesOut.print();). Then it throws the following Exception:
> >
> > _________EXCEPTION START_____________
> > Exception in thread "main" org.apache.flink.runtime.
> client.JobExecutionException:
> > Job execution failed.
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > mcV$sp(JobManager.scala:830)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:773)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:773)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > AbstractDispatcher.scala:401)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > pollAndExecAll(ForkJoinPool.java:1253)
> > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1346)
> > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.types.NullFieldException: Field 2 is null,
> > but expected to hold a value.
> > at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.serialize(
> > TupleSerializer.java:126)
> > at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.serialize(
> > TupleSerializer.java:30)
> > at org.apache.flink.runtime.plugable.SerializationDelegate.write(
> > SerializationDelegate.java:56)
> > at org.apache.flink.runtime.io.network.api.serialization.
> > SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
> > at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> > RecordWriter.java:85)
> > at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(
> > OutputCollector.java:65)
> > at org.apache.flink.runtime.operators.util.metrics.
> > CountingCollector.collect(CountingCollector.java:35)
> > at org.apache.flink.api.java.operators.translation.PlanFilterOperator$
> > FlatMapFilter.flatMap(PlanFilterOperator.java:51)
> > at org.apache.flink.runtime.operators.chaining.
> > ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
> > at org.apache.flink.runtime.operators.util.metrics.
> > CountingCollector.collect(CountingCollector.java:35)
> > at org.apache.flink.graph.library.Summarization$
> VertexGroupReducer.reduce(
> > Summarization.java:323)
> > at org.apache.flink.runtime.operators.GroupReduceDriver.
> > run(GroupReduceDriver.java:131)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.NullPointerException
> > at org.apache.flink.api.common.typeutils.base.LongSerializer.
> > serialize(LongSerializer.java:64)
> > at org.apache.flink.api.common.typeutils.base.LongSerializer.
> > serialize(LongSerializer.java:27)
> > at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.serialize(
> > TupleSerializer.java:124)
> > ... 15 more
> >
> > _____________EXCEPTION END__________________
> >
> > It looks like the problem is in the following lines in Summarization:
> >
> > DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
> >       .join(vertexToRepresentativeMap)
> >       .where(0)  // source vertex id
> >       .equalTo(0) // vertex id
> >       .with(new SourceVertexJoinFunction<K, EV>())
> >       .join(vertexToRepresentativeMap)
> >       .where(1)  // target vertex id
> >       .equalTo(0) // vertex id
> >       .with(new TargetVertexJoinFunction<K, EV>());
> >
> >
> > If you try to print edges before this step, it works fine. But after this
> > step my IDE gives the same exception.
> >
> > I would really appreciate any help.
> >
> > Thank you,
> > Olga
> >
> > _________EXAMPLE START_____________________
> >
> > package org.apache.flink.graph.examples;
> >
> > import org.apache.flink.api.common.ProgramDescription;
> > import org.apache.flink.api.java.DataSet;
> > import org.apache.flink.api.java.ExecutionEnvironment;
> > import org.apache.flink.graph.Edge;
> > import org.apache.flink.graph.Graph;
> > import org.apache.flink.graph.Vertex;
> > import org.apache.flink.graph.library.Summarization;
> > import java.util.LinkedList;
> > import java.util.List;
> >
> > public class MySummarizationExample implements ProgramDescription {
> >
> >     @SuppressWarnings("serial")
> >     public static void main(String [] args) throws Exception {
> >
> >         ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
> >
> >         //Create graph
> >         DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
> >         DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
> >         Graph<Long, Long, Double> graph = Graph.fromDataSet(vertices,
> edges, env);
> >
> >         //emit input
> >         System.out.println("Executing example with following
> inputs:\n"+"Vertices:\n");
> >         vertices.print();
> >         System.out.println("Edges:\n");
> >         edges.print();
> >
> >         Graph<Long, Summarization.VertexValue<Long>,
> Summarization.EdgeValue<Double>> result = graph
> >                 .run(new Summarization<Long, Long, Double>());
> >
> >         //now we want to read the output
> >         DataSet<Edge<Long, Summarization.EdgeValue<Double>>> edgesOut =
> result.getEdges();
> >         DataSet<Vertex<Long, Summarization.VertexValue<Long>>>
> verticesOut = result.getVertices();
> >
> >         // emit result
> >         System.out.println("Summarized graph:\n"+"Vertices:\n");
> >         verticesOut.print();
> >         System.out.println("Edges:\n");
> >         edgesOut.print();
> >     }
> >
> >     @Override
> >     public String getDescription() {
> >         return "Summarization Example";
> >     }
> >
> >     //Define edges
> >     private static DataSet<Edge<Long, Double>> 
> > getEdgeDataSet(ExecutionEnvironment
> env) {
> >         Object[][] DEFAULT_EDGES = new Object[][] {
> >                 new Object[]{1L, 2L, 1.0},
> >                 new Object[]{1L, 4L, 3.0},
> >                 new Object[]{2L, 3L, 6.0},
> >                 new Object[]{2L, 4L, 5.0},
> >                 new Object[]{2L, 5L, 1.0},
> >                 new Object[]{3L, 5L, 5.0},
> >                 new Object[]{3L, 6L, 2.0},
> >                 new Object[]{4L, 5L, 1.0},
> >                 new Object[]{5L, 6L, 4.0}
> >         };
> >         List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long,
> Double>>();
> >         for (Object[] edge : DEFAULT_EDGES) {
> >             edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long)
> edge[1], (Double) edge[2]));
> >         }
> >         return env.fromCollection(edgeList);
> >     }
> >     //Define vertices
> >     private static DataSet<Vertex<Long, Long>> 
> > getVertexDataSet(ExecutionEnvironment
> env) {
> >         //We will summarize by <VV> = Long
> >         Object[][] DEFAULT_VERTICES = new Object[][] {
> >                 new Object[]{1L, 1L},
> >                 new Object[]{2L, 1L},
> >                 new Object[]{3L, 5L},
> >                 new Object[]{4L, 5L},
> >                 new Object[]{5L, 5L}
> >         };
> >         List<Vertex<Long, Long>> vertexList = new
> LinkedList<Vertex<Long, Long>>();
> >         for (Object[] vertex : DEFAULT_VERTICES) {
> >             vertexList.add(new Vertex<Long, Long>((Long) vertex[0],
> (Long) vertex[1]));
> >         }
> >         return env.fromCollection(vertexList);
> >     }
> > }
> >
> > _________EXAMPLE END_____________________
> >
> >
> > Best regards,
> > Olga Golovneva
> >
> > On Thu, Sep 15, 2016 at 9:16 AM, Till Rohrmann <trohrm...@apache.org>
> > wrote:
> >
> >> Hi Olga,
> >>
> >> can you provide us with a little bit more details about the problem. The
> >> full stack trace of the exception and the program you're trying to run
> >> would be helpful.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Sep 14, 2016 at 9:49 PM, Olga Golovneva <melcha...@gmail.com>
> >> wrote:
> >>
> >> > Hi devs,
> >> >
> >> > Do you know if there is an example (besides ITCase) of usage of
> >> > Summarization Library in Gelly? I'm having some problems trying to use
> >> it
> >> > in my code. Particularly, I cannot print output edges ( it throws the
> >> > following exception: Exception in thread "main"
> >> > org.apache.flink.runtime.client.JobExecutionException: Job execution
> >> > failed.), while vertices are printed correctly.
> >> >
> >> > Best regards,
> >> > Olga
> >> >
> >>
> >
> >
>

Reply via email to