[
https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14630360#comment-14630360
]
Andra Lungu commented on FLINK-2361:
------------------------------------
Hi,
I tweaked the code to look like this:
DataSet<Edge<String, NullValue>> edges = getEdgesDataSet(env);
DataSet<Vertex<String, Long>> vertices = edges.flatMap(new
FlatMapFunction<Edge<String, NullValue>, Vertex<String, Long>>() {
@Override
public void flatMap(Edge<String, NullValue> edge,
Collector<Vertex<String, Long>> collector) throws Exception {
collector.collect(new Vertex<String,
Long>(edge.getSource(), Long.parseLong(edge.getSource())));
collector.collect(new Vertex<String,
Long>(edge.getTarget(), Long.parseLong(edge.getTarget())));
}
}).distinct();
if (fileOutput) {
vertices.writeAsCsv(vertexInputPath, "\n", ",");
env.execute();
}
DataSet<Vertex<String, Long>> rereadVertices =
env.readCsvFile(vertexInputPath)
.fieldDelimiter(",").lineDelimiter("\n").ignoreComments("#")
.types(String.class, Long.class).map(new
MapFunction<Tuple2<String, Long>, Vertex<String, Long>>() {
@Override
public Vertex<String, Long>
map(Tuple2<String, Long> tuple2) throws Exception {
return new Vertex<String,
Long>(tuple2.f0, tuple2.f1);
}
});
Graph<String, Long, NullValue> graph =
Graph.fromDataSet(rereadVertices, edges, env);
which is not what I would normally do, BTW...
and I still get:
Caused by: java.lang.Exception: Target vertex '124518874' does not exist!.
at
org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
at
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
at
org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:722)
Lucky day :| At least it's missing a different vertex now...
> flatMap + distinct gives erroneous results for big data sets
> ------------------------------------------------------------
>
> Key: FLINK-2361
> URL: https://issues.apache.org/jira/browse/FLINK-2361
> Project: Flink
> Issue Type: Bug
> Components: Gelly
> Affects Versions: 0.10
> Reporter: Andra Lungu
>
> When running the simple Connected Components algorithm (currently in Gelly)
> on the twitter follower graph, with 1, 100 or 10000 iterations, I get the
> following error:
> Caused by: java.lang.Exception: Target vertex '657282846' does not exist!.
> at
> org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
> at
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
> at
> org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:722)
> Now this is very bizzare as the DataSet of vertices is produced from the
> DataSet of edges... Which means there cannot be a an edge with an invalid
> target id... The method calls flatMap to isolate the src and trg ids and
> distinct to ensure their uniqueness.
> The algorithm works fine for smaller data sets...
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)