I would go for the first solution with the join.

This gives the engine the highest degree of freedom:
- repartition vs. broadcast-forward
- sort-merge vs. hash-join

Best, Fabian

2015-10-28 18:45 GMT+01:00 Vasiliki Kalavri <vasilikikala...@gmail.com>:

> Hi Martin,
>
> isn't finding the intersection of edges enough in this case?
> And assuming there are no duplicate edges, I believe a join should do the
> trick.
>
> Cheers,
> -Vasia.
>
> On 28 October 2015 at 13:15, Martin Junghanns <m.jungha...@mailbox.org>
> wrote:
>
> > Hi all!
> >
> > While working on FLINK-2905, I was wondering what a good (and fast) way
> to
> > compute the intersect between two data sets (Gelly vertices in my case)
> > with unknown size would be.
> >
> > I came up with three ways to solve this:
> >
> > Consider two sets:
> >
> > DataSet<Vertex<K, VV>> verticesLeft =  this.getVertices();
> > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
> >
> > Way 1 (join)
> >
> > intersectVertices = verticesLeft.join(verticesRight)
> >  .where(0)
> >  .equalTo(0)
> >  .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
> >   @Override
> >   public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
> >    throws Exception {
> >     return first;
> >   }
> > });
> >
> > Way 2 (coGroup)
> >
> > intersectVertices = verticesLeft.coGroup(verticesRight)
> >  .where(0)
> >  .equalTo(0)
> >  .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
> >   @Override
> >   public void coGroup(Iterable<Vertex<K, VV>> first,
> >         Iterable<Vertex<K, VV>> second,
> >         Collector<Vertex<K, VV>> out) throws Exception {
> >    Iterator<Vertex<K, VV>> leftIt = first.iterator();
> >    Iterator<Vertex<K, VV>> rightIt = second.iterator();
> >    if (leftIt.hasNext() && rightIt.hasNext()) {
> >     out.collect(leftIt.next());
> >    }
> >   }
> >  });
> >
> > Way 3 (union + groupBy + aggregate)
> >
> > intersectVertices = verticesLeft.union(verticesRight)
> >  .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
> >   @Override
> >   public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
> >         throws Exception {
> >    return new Tuple3<>(vertex.f0, vertex.f1, 1);
> >   }
> >  }).withForwardedFields("f0;f1")
> >  .groupBy(0) // vertex id
> >  .aggregate(Aggregations.SUM, 2)
> >  .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
> >   @Override
> >   public boolean filter(Tuple3<K, VV, Integer> value) {
> >    return value.f2 == 2;
> >   }
> >  })
> >  .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
> >   @Override
> >   public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
> >    return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
> >   }
> >  }).withForwardedFields("f0;f1");
> >
> > Thanks for your input.
> >
> > Best,
> >
> > Martin
> >
> >
> >
> >
>

Reply via email to