Hi Martin, isn't finding the intersection of edges enough in this case? And assuming there are no duplicate edges, I believe a join should do the trick.
Cheers, -Vasia. On 28 October 2015 at 13:15, Martin Junghanns <m.jungha...@mailbox.org> wrote: > Hi all! > > While working on FLINK-2905, I was wondering what a good (and fast) way to > compute the intersect between two data sets (Gelly vertices in my case) > with unknown size would be. > > I came up with three ways to solve this: > > Consider two sets: > > DataSet<Vertex<K, VV>> verticesLeft = this.getVertices(); > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices(); > > Way 1 (join) > > intersectVertices = verticesLeft.join(verticesRight) > .where(0) > .equalTo(0) > .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() { > @Override > public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second) > throws Exception { > return first; > } > }); > > Way 2 (coGroup) > > intersectVertices = verticesLeft.coGroup(verticesRight) > .where(0) > .equalTo(0) > .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() { > @Override > public void coGroup(Iterable<Vertex<K, VV>> first, > Iterable<Vertex<K, VV>> second, > Collector<Vertex<K, VV>> out) throws Exception { > Iterator<Vertex<K, VV>> leftIt = first.iterator(); > Iterator<Vertex<K, VV>> rightIt = second.iterator(); > if (leftIt.hasNext() && rightIt.hasNext()) { > out.collect(leftIt.next()); > } > } > }); > > Way 3 (union + groupBy + aggregate) > > intersectVertices = verticesLeft.union(verticesRight) > .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() { > @Override > public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex) > throws Exception { > return new Tuple3<>(vertex.f0, vertex.f1, 1); > } > }).withForwardedFields("f0;f1") > .groupBy(0) // vertex id > .aggregate(Aggregations.SUM, 2) > .filter(new FilterFunction<Tuple3<K, VV, Integer>>() { > @Override > public boolean filter(Tuple3<K, VV, Integer> value) { > return value.f2 == 2; > } > }) > .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() { > @Override > public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) { > return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1); > } > }).withForwardedFields("f0;f1"); > > Thanks for your input. > > Best, > > Martin > > > >