[FLINK-1201] [gelly] added label propagation in library
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de2aa578 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de2aa578 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de2aa578 Branch: refs/heads/master Commit: de2aa5784af3fae4217fdca478ff8b8824b8be50 Parents: 8ff94da Author: vasia <vasilikikala...@gmail.com> Authored: Mon Jan 5 17:50:29 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:15 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/graph/GraphAlgorithm.java | 5 ++ .../flink/graph/library/LabelPropagation.java | 82 ++++++++++++++++++++ 2 files changed, 87 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de2aa578/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java index c2cbd71..2f5de95 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java @@ -3,6 +3,11 @@ package flink.graphs; import java.io.Serializable; +/** + * @param <K> key type + * @param <VV> vertex value type + * @param <EV> edge value type + */ public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { http://git-wip-us.apache.org/repos/asf/flink/blob/de2aa578/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java new file mode 100644 index 0000000..430ccbb --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -0,0 +1,82 @@ +package flink.graphs.library; + +import flink.graphs.*; + +import org.apache.flink.spargel.java.MessageIterator; +import org.apache.flink.spargel.java.MessagingFunction; +import org.apache.flink.spargel.java.VertexUpdateFunction; +import org.apache.flink.types.NullValue; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +@SuppressWarnings("serial") +public class LabelPropagation<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Long, NullValue> { + + private final int maxIterations; + + public LabelPropagation(int maxIterations) { + this.maxIterations = maxIterations; + } + + @Override + public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) { + + // iteratively adopt the most frequent label among the neighbors + // of each vertex + return input.runVertexCentricIteration( + new UpdateVertexLabel<K>(), + new SendNewLabelToNeighbors<K>(), + maxIterations + ); + } + + /** + * Function that updates the value of a vertex by adopting the most frequent label + * among its in-neighbors + */ + public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable> + extends VertexUpdateFunction<K, Long, Long> { + + public void updateVertex(K vertexKey, Long vertexValue, MessageIterator<Long> inMessages) { + Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>(); + + long maxFrequency = 1; + long mostFrequentLabel = vertexValue; + + // store the labels with their frequencies + for (Long msg : inMessages) { + if (labelsWithFrequencies.containsKey(msg)) { + long currentFreq = labelsWithFrequencies.get(msg); + labelsWithFrequencies.put(msg, currentFreq + 1); + } + else { + labelsWithFrequencies.put(msg, 1L); + } + } + // select the most frequent label + for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) { + if (entry.getValue() > maxFrequency) { + maxFrequency = entry.getValue(); + mostFrequentLabel = entry.getKey(); + } + } + + // set the new vertex value + setNewVertexValue(mostFrequentLabel); + } + } + + /** + * Sends the vertex label to all out-neighbors + */ + public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable> + extends MessagingFunction<K, Long, Long, NullValue> { + + public void sendMessages(K vertexKey, Long newLabel) { + sendMessageToAllNeighbors(newLabel); + } + } +} \ No newline at end of file