[
https://issues.apache.org/jira/browse/FLINK-1726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377671#comment-14377671
]
ASF GitHub Bot commented on FLINK-1726:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/505#discussion_r27017460
--- Diff:
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Simple Community Detection Algorithm.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own id along
with a score equal to 1.0, as value.
+ * The vertices propagate their labels and max scores in iterations, each
time adopting the label with the
+ * highest score from the list of received messages. The chosen label is
afterwards re-scored.
+ *
+ * The algorithm converges when vertices no longer update their value or
when the maximum number of iterations
+ * is reached.
+ *<p>
+ * The input files is a plain text file and must be formatted as
follows:
+ * <br>
+ * Edges are represented by tuples of srcVertexId, trgVertexId
which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1 2\n1 3\n</code> defines two edges
1-2 and 1-3
+ * </p>
+ *
+ * Usage <code>SimpleCommunityDetection <edge path> <result
path>
+ * <number of iterations></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link
org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
+ */
+public class SimpleCommunityDetection implements GraphAlgorithm<Long,
Tuple2<Long, Double>, Double> {
+
+ private Integer maxIterations;
+
+ public SimpleCommunityDetection(Integer maxIterations) {
+
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public Graph<Long, Tuple2<Long, Double>, Double> run(Graph<Long,
Tuple2<Long, Double>, Double> graph) {
+
+ Graph<Long, Tuple2<Long, Double>, Double> undirectedGraph =
graph.getUndirected();
+
+ VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long,
Double>, Double>
+ iteration =
undirectedGraph.createVertexCentricIteration(new VertexLabelUpdater(),
+ new LabelMessenger(), maxIterations);
+
+ return undirectedGraph.runVertexCentricIteration(iteration);
+ }
+
+ public static final class VertexLabelUpdater extends
VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ @Override
+ public void updateVertex(Long vertexKey, Tuple2<Long, Double>
labelScore,
+
MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+ // we would like these two maps to be ordered
+ Map<Long, Double> receivedLabelsWithScores = new
TreeMap<Long, Double>();
+ Map<Long, Double> labelsWithHighestScore = new
TreeMap<Long, Double>();
+
+ for (Tuple2<Long, Double> message : inMessages) {
+ // split the message into received label and
score
+ Long receivedLabel = message.f0;
+ Double receivedScore = message.f1;
+
+ // if the label was received before
+ if
(receivedLabelsWithScores.containsKey(receivedLabel)) {
+ Double newScore = receivedScore +
receivedLabelsWithScores.get(receivedLabel);
+
receivedLabelsWithScores.put(receivedLabel, newScore);
+ } else {
+ // first time we see the label
+
receivedLabelsWithScores.put(receivedLabel, receivedScore);
+ }
+
+ // store the labels with the highest scores
+ if
(labelsWithHighestScore.containsKey(receivedLabel)) {
+ Double currentScore =
labelsWithHighestScore.get(receivedLabel);
+ if (currentScore < receivedScore) {
+ // record the highest score
+
labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ } else {
+ // first time we see this label
+
labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ }
+
+ if(receivedLabelsWithScores.size() > 0) {
+ // find the label with the highest score from
the ones received
+ Double maxScore = -Double.MAX_VALUE;
+ Long maxScoreLabel = labelScore.f0;
+ for (Long curLabel :
receivedLabelsWithScores.keySet()) {
+
+ if
(receivedLabelsWithScores.get(curLabel) > maxScore) {
+ maxScore =
receivedLabelsWithScores.get(curLabel);
+ maxScoreLabel = curLabel;
+ }
+ }
+
+ // find the highest score of maxScoreLabel
+ Double highestScore =
labelsWithHighestScore.get(maxScoreLabel);
+ // re-score the new label
+ if (maxScoreLabel != labelScore.f0) {
+ // delta = 0.5
+ highestScore -= 0.5f /
getSuperstepNumber();
--- End diff --
I think delta should be a parameter and the fact that we decrease it by
diving with the superstep number should be explained in the description of the
algorithm in the beginning.
> Add Community Detection Library and Example
> -------------------------------------------
>
> Key: FLINK-1726
> URL: https://issues.apache.org/jira/browse/FLINK-1726
> Project: Flink
> Issue Type: Task
> Components: Gelly
> Affects Versions: 0.9
> Reporter: Andra Lungu
> Assignee: Andra Lungu
>
> Community detection paper: http://arxiv.org/pdf/0808.2633.pdf
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)