[
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341823#comment-15341823
]
ASF GitHub Bot commented on FLINK-1707:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/2053#discussion_r67873916
--- Diff:
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to
be clustered and the edge weights are the
+ * similarities of these points among them.
+ *
+ * The output is a Dataset of Tuple2, where f0 is the point id and f1 is
the exemplar, so the clusters will be the
+ * the Tuples grouped by f1
+ *
+ * @see <a
href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
+ */
+
+@SuppressWarnings("serial")
+public class AffinityPropagation implements
GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
+
+ private static Integer maxIterations;
+ private static float damping;
+ private static float epsilon;
+
+ /**
+ * Creates a new AffinityPropagation instance algorithm instance.
+ *
+ * @param maxIterations The maximum number of iterations to run
+ * @param damping Damping factor.
+ * @param epsilon Epsilon factor. Do not send message to a neighbor if
the new message
+ * has not changed more than epsilon.
+ */
+ public AffinityPropagation(Integer maxIterations, float damping, float
epsilon) {
+ this.maxIterations = maxIterations;
+ this.damping = damping;
+ this.epsilon = epsilon;
+ }
+
+ @Override
+ public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double>
input) throws Exception {
+
+ // Create E and I AP vertices
+ DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors
=
+ input.groupReduceOnEdges(new InitAPVertex(),
EdgeDirection.IN);
+
+ List<Vertex<Long, APVertexValue>> APvertices =
verticesWithAllInNeighbors.collect();
+
+ // Create E and I AP edges. Could this be done with some gelly
functionality?
+ List<Edge<Long, NullValue>> APedges = new ArrayList<>();
+
+ for(int i = 1; i < input.numberOfVertices() + 1; i++){
+ for(int j = 1; j < input.numberOfVertices() + 1; j++){
+ APedges.add(new Edge<>(i * 10L, j * 10L + 1,
NullValue.getInstance()));
--- End diff --
Similarly, this is a potentially very large list, which might not fit in
memory...
> Add an Affinity Propagation Library Method
> ------------------------------------------
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
> Issue Type: New Feature
> Components: Gelly
> Reporter: Vasia Kalavri
> Assignee: Josep Rubió
> Priority: Minor
> Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)