Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/544#issuecomment-88088234 Sure, I thought it was obvious from the ugly if :) In EmitOneEdgePerNode we have: public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) { out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge)); out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge)); } so, for an edge 4-5; when you have 4 - you collect edge 4-5; when you have 5 - you collect edge 4-5 again. So, in Jaccard, instead of having: neighborsHashSet.add(next.f1) in the EdgesFunction, you now need to do something like: if(next.f1.getSource() == next.f0) { neighborsHashSet.add(next.f1.getTarget()); } else { neighborsHashSet.add(next.f1.getSource()); } If that's the desired behaviour we can keep the if, but at least for me, it was not expected :D
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---