I would go for the first solution with the join. This gives the engine the highest degree of freedom: - repartition vs. broadcast-forward - sort-merge vs. hash-join
Best, Fabian 2015-10-28 18:45 GMT+01:00 Vasiliki Kalavri <vasilikikala...@gmail.com>: > Hi Martin, > > isn't finding the intersection of edges enough in this case? > And assuming there are no duplicate edges, I believe a join should do the > trick. > > Cheers, > -Vasia. > > On 28 October 2015 at 13:15, Martin Junghanns <m.jungha...@mailbox.org> > wrote: > > > Hi all! > > > > While working on FLINK-2905, I was wondering what a good (and fast) way > to > > compute the intersect between two data sets (Gelly vertices in my case) > > with unknown size would be. > > > > I came up with three ways to solve this: > > > > Consider two sets: > > > > DataSet<Vertex<K, VV>> verticesLeft = this.getVertices(); > > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices(); > > > > Way 1 (join) > > > > intersectVertices = verticesLeft.join(verticesRight) > > .where(0) > > .equalTo(0) > > .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() { > > @Override > > public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second) > > throws Exception { > > return first; > > } > > }); > > > > Way 2 (coGroup) > > > > intersectVertices = verticesLeft.coGroup(verticesRight) > > .where(0) > > .equalTo(0) > > .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() { > > @Override > > public void coGroup(Iterable<Vertex<K, VV>> first, > > Iterable<Vertex<K, VV>> second, > > Collector<Vertex<K, VV>> out) throws Exception { > > Iterator<Vertex<K, VV>> leftIt = first.iterator(); > > Iterator<Vertex<K, VV>> rightIt = second.iterator(); > > if (leftIt.hasNext() && rightIt.hasNext()) { > > out.collect(leftIt.next()); > > } > > } > > }); > > > > Way 3 (union + groupBy + aggregate) > > > > intersectVertices = verticesLeft.union(verticesRight) > > .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() { > > @Override > > public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex) > > throws Exception { > > return new Tuple3<>(vertex.f0, vertex.f1, 1); > > } > > }).withForwardedFields("f0;f1") > > .groupBy(0) // vertex id > > .aggregate(Aggregations.SUM, 2) > > .filter(new FilterFunction<Tuple3<K, VV, Integer>>() { > > @Override > > public boolean filter(Tuple3<K, VV, Integer> value) { > > return value.f2 == 2; > > } > > }) > > .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() { > > @Override > > public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) { > > return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1); > > } > > }).withForwardedFields("f0;f1"); > > > > Thanks for your input. > > > > Best, > > > > Martin > > > > > > > > >