[ 
https://issues.apache.org/jira/browse/FLINK-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377671#comment-14377671
 ] 

ASF GitHub Bot commented on FLINK-1726:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/505#discussion_r27017460
  
    --- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
 ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.VertexCentricIteration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +/**
    + * Simple Community Detection Algorithm.
    + *
    + * Initially, each vertex is assigned a tuple formed of its own id along 
with a score equal to 1.0, as value.
    + * The vertices propagate their labels and max scores in iterations, each 
time adopting the label with the
    + * highest score from the list of received messages. The chosen label is 
afterwards re-scored.
    + *
    + * The algorithm converges when vertices no longer update their value or 
when the maximum number of iterations
    + * is reached.
    + *<p>
    + *         The input files is a plain text file and must be formatted as 
follows:
    + *         <br>
    + *         Edges are represented by tuples of srcVertexId, trgVertexId 
which are
    + *         separated by tabs. Edges themselves are separated by newlines.
    + *         For example: <code>1    2\n1    3\n</code> defines two edges 
1-2 and 1-3
    + * </p>
    + *
    + * Usage <code>SimpleCommunityDetection &lt;edge path&gt; &lt;result 
path&gt;
    + * &lt;number of iterations&gt;</code><br>
    + * If no parameters are provided, the program is run with default data from
    + * {@link 
org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
    + */
    +public class SimpleCommunityDetection implements GraphAlgorithm<Long, 
Tuple2<Long, Double>, Double> {
    +
    +   private Integer maxIterations;
    +
    +   public SimpleCommunityDetection(Integer maxIterations) {
    +
    +           this.maxIterations = maxIterations;
    +   }
    +
    +   @Override
    +   public Graph<Long, Tuple2<Long, Double>, Double> run(Graph<Long, 
Tuple2<Long, Double>, Double> graph) {
    +
    +           Graph<Long, Tuple2<Long, Double>, Double> undirectedGraph = 
graph.getUndirected();
    +
    +           VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, 
Double>, Double>
    +                           iteration = 
undirectedGraph.createVertexCentricIteration(new VertexLabelUpdater(),
    +                           new LabelMessenger(), maxIterations);
    +
    +           return undirectedGraph.runVertexCentricIteration(iteration);
    +   }
    +
    +   public static final class VertexLabelUpdater extends 
VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
    +
    +           @Override
    +           public void updateVertex(Long vertexKey, Tuple2<Long, Double> 
labelScore,
    +                                                           
MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
    +
    +                   // we would like these two maps to be ordered
    +                   Map<Long, Double> receivedLabelsWithScores = new 
TreeMap<Long, Double>();
    +                   Map<Long, Double> labelsWithHighestScore = new 
TreeMap<Long, Double>();
    +
    +                   for (Tuple2<Long, Double> message : inMessages) {
    +                           // split the message into received label and 
score
    +                           Long receivedLabel = message.f0;
    +                           Double receivedScore = message.f1;
    +
    +                           // if the label was received before
    +                           if 
(receivedLabelsWithScores.containsKey(receivedLabel)) {
    +                                   Double newScore = receivedScore + 
receivedLabelsWithScores.get(receivedLabel);
    +                                   
receivedLabelsWithScores.put(receivedLabel, newScore);
    +                           } else {
    +                                   // first time we see the label
    +                                   
receivedLabelsWithScores.put(receivedLabel, receivedScore);
    +                           }
    +
    +                           // store the labels with the highest scores
    +                           if 
(labelsWithHighestScore.containsKey(receivedLabel)) {
    +                                   Double currentScore = 
labelsWithHighestScore.get(receivedLabel);
    +                                   if (currentScore < receivedScore) {
    +                                           // record the highest score
    +                                           
labelsWithHighestScore.put(receivedLabel, receivedScore);
    +                                   }
    +                           } else {
    +                                   // first time we see this label
    +                                   
labelsWithHighestScore.put(receivedLabel, receivedScore);
    +                           }
    +                   }
    +
    +                   if(receivedLabelsWithScores.size() > 0) {
    +                           // find the label with the highest score from 
the ones received
    +                           Double maxScore = -Double.MAX_VALUE;
    +                           Long maxScoreLabel = labelScore.f0;
    +                           for (Long curLabel : 
receivedLabelsWithScores.keySet()) {
    +
    +                                   if 
(receivedLabelsWithScores.get(curLabel) > maxScore) {
    +                                           maxScore = 
receivedLabelsWithScores.get(curLabel);
    +                                           maxScoreLabel = curLabel;
    +                                   }
    +                           }
    +
    +                           // find the highest score of maxScoreLabel
    +                           Double highestScore = 
labelsWithHighestScore.get(maxScoreLabel);
    +                           // re-score the new label
    +                           if (maxScoreLabel != labelScore.f0) {
    +                                   // delta = 0.5
    +                                   highestScore -= 0.5f / 
getSuperstepNumber();
    --- End diff --
    
    I think delta should be a parameter and the fact that we decrease it by 
diving with the superstep number should be explained in the description of the 
algorithm in the beginning.


> Add Community Detection Library and Example
> -------------------------------------------
>
>                 Key: FLINK-1726
>                 URL: https://issues.apache.org/jira/browse/FLINK-1726
>             Project: Flink
>          Issue Type: Task
>          Components: Gelly
>    Affects Versions: 0.9
>            Reporter: Andra Lungu
>            Assignee: Andra Lungu
>
> Community detection paper: http://arxiv.org/pdf/0808.2633.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to