Hi Till, Thanks a lot for your help! I'll try to use another variable type in the meantime.
Best regards, Olga Best regards, Olga Golovneva On Thu, Sep 15, 2016 at 1:03 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Olga, > > it’s indeed an error in Flink’s Summarization algorithm. The problem is the > following: The vertex group value of the VertexGroupItem is null in the > VertexGroupReducer. This works in the SummarizationIT case because the > vertex value is of type String and the StringSerializer can deal with null > values. > > However, in your case where you use longs, it fails, because the > LongSerializer cannot handle null values. You can verify this behaviour by > changing the vertex value type to String. Then everything should work > without a problem. > > I’ve cc’ed Martin who can tell you probably more about the Summarization > algorithm. I’ve also opened a JIRA ticket [1] to fix this problem. > > Thanks for reporting this bug. > > [1] https://issues.apache.org/jira/browse/FLINK-4624 > > Cheers, > Till > > > On Thu, Sep 15, 2016 at 5:05 PM, Olga Golovneva <melcha...@gmail.com> > wrote: > > > Hi Till, > > > > I've created a simple (Java) example to show you what's going on. The > code > > is in attachment and shown below. This example creates simple graph with > > Double EV and Long VV. Then it runs Summarization, that should compute a > > condensed version of the input graph by grouping vertices and edges based > > on their values. I run this code with IntelliJ IDEA. The code executes > fine > > until you want to see what is written in resulted edges (just uncomment > > line 46, edgesOut.print();). Then it throws the following Exception: > > > > _________EXCEPTION START_____________ > > Exception in thread "main" org.apache.flink.runtime. > client.JobExecutionException: > > Job execution failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > mcV$sp(JobManager.scala:830) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:773) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:773) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > liftedTree1$1(Future.scala:24) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > Future.scala:24) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > AbstractDispatcher.scala:401) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > pollAndExecAll(ForkJoinPool.java:1253) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > runTask(ForkJoinPool.java:1346) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.types.NullFieldException: Field 2 is null, > > but expected to hold a value. > > at org.apache.flink.api.java.typeutils.runtime. > TupleSerializer.serialize( > > TupleSerializer.java:126) > > at org.apache.flink.api.java.typeutils.runtime. > TupleSerializer.serialize( > > TupleSerializer.java:30) > > at org.apache.flink.runtime.plugable.SerializationDelegate.write( > > SerializationDelegate.java:56) > > at org.apache.flink.runtime.io.network.api.serialization. > > SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83) > > at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit( > > RecordWriter.java:85) > > at org.apache.flink.runtime.operators.shipping.OutputCollector.collect( > > OutputCollector.java:65) > > at org.apache.flink.runtime.operators.util.metrics. > > CountingCollector.collect(CountingCollector.java:35) > > at org.apache.flink.api.java.operators.translation.PlanFilterOperator$ > > FlatMapFilter.flatMap(PlanFilterOperator.java:51) > > at org.apache.flink.runtime.operators.chaining. > > ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) > > at org.apache.flink.runtime.operators.util.metrics. > > CountingCollector.collect(CountingCollector.java:35) > > at org.apache.flink.graph.library.Summarization$ > VertexGroupReducer.reduce( > > Summarization.java:323) > > at org.apache.flink.runtime.operators.GroupReduceDriver. > > run(GroupReduceDriver.java:131) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at org.apache.flink.runtime.operators.BatchTask.invoke( > BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.NullPointerException > > at org.apache.flink.api.common.typeutils.base.LongSerializer. > > serialize(LongSerializer.java:64) > > at org.apache.flink.api.common.typeutils.base.LongSerializer. > > serialize(LongSerializer.java:27) > > at org.apache.flink.api.java.typeutils.runtime. > TupleSerializer.serialize( > > TupleSerializer.java:124) > > ... 15 more > > > > _____________EXCEPTION END__________________ > > > > It looks like the problem is in the following lines in Summarization: > > > > DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges() > > .join(vertexToRepresentativeMap) > > .where(0) // source vertex id > > .equalTo(0) // vertex id > > .with(new SourceVertexJoinFunction<K, EV>()) > > .join(vertexToRepresentativeMap) > > .where(1) // target vertex id > > .equalTo(0) // vertex id > > .with(new TargetVertexJoinFunction<K, EV>()); > > > > > > If you try to print edges before this step, it works fine. But after this > > step my IDE gives the same exception. > > > > I would really appreciate any help. > > > > Thank you, > > Olga > > > > _________EXAMPLE START_____________________ > > > > package org.apache.flink.graph.examples; > > > > import org.apache.flink.api.common.ProgramDescription; > > import org.apache.flink.api.java.DataSet; > > import org.apache.flink.api.java.ExecutionEnvironment; > > import org.apache.flink.graph.Edge; > > import org.apache.flink.graph.Graph; > > import org.apache.flink.graph.Vertex; > > import org.apache.flink.graph.library.Summarization; > > import java.util.LinkedList; > > import java.util.List; > > > > public class MySummarizationExample implements ProgramDescription { > > > > @SuppressWarnings("serial") > > public static void main(String [] args) throws Exception { > > > > ExecutionEnvironment env = ExecutionEnvironment. > getExecutionEnvironment(); > > > > //Create graph > > DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); > > DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env); > > Graph<Long, Long, Double> graph = Graph.fromDataSet(vertices, > edges, env); > > > > //emit input > > System.out.println("Executing example with following > inputs:\n"+"Vertices:\n"); > > vertices.print(); > > System.out.println("Edges:\n"); > > edges.print(); > > > > Graph<Long, Summarization.VertexValue<Long>, > Summarization.EdgeValue<Double>> result = graph > > .run(new Summarization<Long, Long, Double>()); > > > > //now we want to read the output > > DataSet<Edge<Long, Summarization.EdgeValue<Double>>> edgesOut = > result.getEdges(); > > DataSet<Vertex<Long, Summarization.VertexValue<Long>>> > verticesOut = result.getVertices(); > > > > // emit result > > System.out.println("Summarized graph:\n"+"Vertices:\n"); > > verticesOut.print(); > > System.out.println("Edges:\n"); > > edgesOut.print(); > > } > > > > @Override > > public String getDescription() { > > return "Summarization Example"; > > } > > > > //Define edges > > private static DataSet<Edge<Long, Double>> > > getEdgeDataSet(ExecutionEnvironment > env) { > > Object[][] DEFAULT_EDGES = new Object[][] { > > new Object[]{1L, 2L, 1.0}, > > new Object[]{1L, 4L, 3.0}, > > new Object[]{2L, 3L, 6.0}, > > new Object[]{2L, 4L, 5.0}, > > new Object[]{2L, 5L, 1.0}, > > new Object[]{3L, 5L, 5.0}, > > new Object[]{3L, 6L, 2.0}, > > new Object[]{4L, 5L, 1.0}, > > new Object[]{5L, 6L, 4.0} > > }; > > List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, > Double>>(); > > for (Object[] edge : DEFAULT_EDGES) { > > edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) > edge[1], (Double) edge[2])); > > } > > return env.fromCollection(edgeList); > > } > > //Define vertices > > private static DataSet<Vertex<Long, Long>> > > getVertexDataSet(ExecutionEnvironment > env) { > > //We will summarize by <VV> = Long > > Object[][] DEFAULT_VERTICES = new Object[][] { > > new Object[]{1L, 1L}, > > new Object[]{2L, 1L}, > > new Object[]{3L, 5L}, > > new Object[]{4L, 5L}, > > new Object[]{5L, 5L} > > }; > > List<Vertex<Long, Long>> vertexList = new > LinkedList<Vertex<Long, Long>>(); > > for (Object[] vertex : DEFAULT_VERTICES) { > > vertexList.add(new Vertex<Long, Long>((Long) vertex[0], > (Long) vertex[1])); > > } > > return env.fromCollection(vertexList); > > } > > } > > > > _________EXAMPLE END_____________________ > > > > > > Best regards, > > Olga Golovneva > > > > On Thu, Sep 15, 2016 at 9:16 AM, Till Rohrmann <trohrm...@apache.org> > > wrote: > > > >> Hi Olga, > >> > >> can you provide us with a little bit more details about the problem. The > >> full stack trace of the exception and the program you're trying to run > >> would be helpful. > >> > >> Cheers, > >> Till > >> > >> On Wed, Sep 14, 2016 at 9:49 PM, Olga Golovneva <melcha...@gmail.com> > >> wrote: > >> > >> > Hi devs, > >> > > >> > Do you know if there is an example (besides ITCase) of usage of > >> > Summarization Library in Gelly? I'm having some problems trying to use > >> it > >> > in my code. Particularly, I cannot print output edges ( it throws the > >> > following exception: Exception in thread "main" > >> > org.apache.flink.runtime.client.JobExecutionException: Job execution > >> > failed.), while vertices are printed correctly. > >> > > >> > Best regards, > >> > Olga > >> > > >> > > > > >