[ https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619230#comment-14619230 ]
ASF GitHub Bot commented on FLINK-2310: --------------------------------------- Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/892#discussion_r34189351 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData; +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Adamic Acard similarity coefficient which is given as + * Summation of weights of common neighbors of the source and destination vertex + *The weights are given as 1/log(nK) nK is the degree or the vertex + * See <a href="http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf">Friends and neighbors on the Web</a> + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <br> + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3. + * </p> + * + * Usage <code> AdamicAdarSimilarityMeasure <edge path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData} + */ +@SuppressWarnings("serial") +public class AdamicAdarSimilarityMeasure implements ProgramDescription { + + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = AdamicAdarSimilarityMeasure.getEdgesDataSet(env); + + /*Graph is generated without vertex weights. Vertices will have a Tuple2 as value + where first field will be the weight of the vertex as 1/log(kn) kn is the degree of the vertex + Second field is a HashSet whose elements are again Tuple2, of which first field is the VertexID of the neighbor + and second field is the weight of that neighbor + */ + Graph<Long, Tuple2<Double, HashSet<Tuple2<Long, Double>>>, Double> graph = + Graph.fromDataSet(edges,new mapVertices(), env); + + DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees(); + + //vertices are given weights in graph + graph = graph.joinWithVertices(degrees,new AssignWeightToVertices()); + + //neighbors are computed for all the vertices + DataSet<Tuple2<Long, Tuple2<Double, HashSet<Tuple2<Long, Double>>>>> computedNeighbors = + graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + //graph is updated with the new vertex values + Graph<Long, Tuple2<Double, HashSet<Tuple2<Long, Double>>>, Double> graphWithVertexValues = + graph.joinWithVertices(computedNeighbors, new UpdateGraphVertices()); + + //edges are given Adamic Adar coefficient as value + DataSet<Edge<Long, Double>> edgesWithAdamicValues = graphWithVertexValues.getTriplets().map(new ComputeAdamic()); + + // emit result + if (fileOutput) { + edgesWithAdamicValues.writeAsCsv(outputPath, "\n", ","); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("Executing Adamic Adar Similarity Measure"); + } else { + edgesWithAdamicValues.print(); + } + } + + @Override + public String getDescription() { + return "Adamic Adar Similarity Measure"; + } + + /* + * Each vertex has a Tuple2 as value. The first field of the Tuple is the weight of the vertex + * The second field is the HashSet containing of all the neighbors and their weights + */ + private static final class GatherNeighbors implements ReduceNeighborsFunction<Tuple2<Double, + HashSet<Tuple2<Long, Double>>>> { + @Override + public Tuple2<Double, HashSet<Tuple2<Long, Double>>> + reduceNeighbors(Tuple2<Double, HashSet<Tuple2<Long, Double>>> first, + Tuple2<Double, HashSet<Tuple2<Long, Double>>> second) { + first.f1.addAll(second.f1); + return first; + } + } + + //mapfunction to initialise Vertex values from edge dataset + private static class mapVertices implements MapFunction<Long, Tuple2<Double, HashSet<Tuple2<Long, Double>>>> { + @Override + public Tuple2<Double, HashSet<Tuple2<Long, Double>>> map(Long value) { + HashSet<Tuple2<Long, Double>> neighbors = new HashSet<Tuple2<Long, Double>>(); + neighbors.add(new Tuple2<Long, Double>(value, 0.0)); + Double val = Double.MIN_VALUE; + return new Tuple2<Double, HashSet<Tuple2<Long, Double>>>(val, neighbors); + } + } + + //mapfunction to assign weight for each vertex as 1/log(kn) where kn is the degree of the vertex + private static class AssignWeightToVertices implements + MapFunction<Tuple2<Tuple2<Double, HashSet<Tuple2<Long, Double>>>, Long>, + Tuple2<Double, HashSet<Tuple2<Long, Double>>>> { + @Override + public Tuple2<Double, HashSet<Tuple2<Long, Double>>> + map(Tuple2<Tuple2<Double, HashSet<Tuple2<Long, Double>>>, Long> value) throws Exception { + + value.f0.f0 = 1.0/Math.log(value.f1); + + for(Tuple2<Long, Double> t : value.f0.f1) { + t = new Tuple2<Long, Double> (t.f0, value.f0.f0); + value.f0.f1.clear(); + value.f0.f1.add(t); + } + return value.f0; + } + } + + //mapfunction to update values of vertices in the Graph --- End diff -- Let's be consistent and out them all with /** ...*/ when they're in front of classes > Add an Adamic-Adar Similarity example > ------------------------------------- > > Key: FLINK-2310 > URL: https://issues.apache.org/jira/browse/FLINK-2310 > Project: Flink > Issue Type: Task > Components: Gelly > Reporter: Andra Lungu > Assignee: Shivani Ghatge > Priority: Minor > > Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a > set of nodes. However, instead of counting the common neighbors and dividing > them by the total number of neighbors, the similarity is weighted according > to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). > The Adamic-Adar algorithm can be broken into three steps: > 1). For each vertex, compute the log of its inverse degrees (with the formula > above) and set it as the vertex value. > 2). Each vertex will then send this new computed value along with a list of > neighbors to the targets of its out-edges > 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of > log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is > the degree of node n). See [2] > Prerequisites: > - Full understanding of the Jaccard Similarity Measure algorithm > - Reading the associated literature: > [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf > [2] > http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)