[ https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883256#comment-15883256 ]
ASF GitHub Bot commented on FLINK-1707: --------------------------------------- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2885#discussion_r102994488 --- Diff: flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ap/AffinityPropagationBulk.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.examples.java.ap; + +import org.apache.flink.api.common.aggregators.ConvergenceCriterion; +import org.apache.flink.api.common.aggregators.LongSumAggregator; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichJoinFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.examples.java.ap.util.AffinityPropagationData; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Created by joseprubio on 9/22/16. + */ + +public class AffinityPropagationBulk { + + private static final double DAMPING_FACTOR = 0.9; + private static final double CONVERGENCE_THRESHOLD = 0.12; + private static final String CONVERGENCE_MESSAGES = "message convergence"; + + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + // Get input similarities Tuple3<src, target, similarity> + DataSet<Tuple3<LongValue, LongValue, DoubleValue>> similarities = + AffinityPropagationData.getTuplesFromFile(env); + + // Init input to iteration + DataSet<Tuple4<LongValue, LongValue, DoubleValue, DoubleValue>> initMessages + = similarities.map(new InitMessage()); + + // Iterate + IterativeDataSet<Tuple4<LongValue, LongValue, DoubleValue, DoubleValue>> messages + = initMessages.iterate(20); + + // Create aggregator + messages.registerAggregationConvergenceCriterion(CONVERGENCE_MESSAGES, new LongSumAggregator(), + new MessageConvergence(similarities.count() * 2)); + + // Start responsibility message calculation + // r(i,k) <- s(i,k) - max {a(i,K) + s(i,K)} st K != k + // Iterate over Tuple6 <Source, Target, Responsibility , Availability, IsExemplar, ConvergenceCounter> + + DataSet<Tuple3<LongValue, LongValue, DoubleValue>> responsibilities = similarities + + // Get a list of a(i,K) + s(i,K) values joining similarities with messages + .join(messages).where("f0","f1").equalTo("f0","f1").with(new joinAvailabilitySimilarity()) + + // Get a dataset with 2 higher values + .groupBy("f1").sortGroup("f2", Order.DESCENDING).first(2) + + // Create a Tuple4<Trg, MaxValue, MaxNeighbour, SecondMaxValue> reducing the 2 tuples with higher values + .groupBy("f1").reduceGroup(new responsibilityReduceGroup()) + + // Calculate the R messages "r(i,k) <- s(i,k) - value" getting "value" joining + // similarities with previous tuple + .leftOuterJoin(similarities).where("f0").equalTo("f1").with(new responsibilityValue()) + + // Responsibility damping + .join(messages).where("f0","f1").equalTo("f1","f0").with(new dampedRValue(DAMPING_FACTOR, CONVERGENCE_THRESHOLD)); + + // Start availability message calculation + // a(i,k) <- min {0, r(k,k) + sum{max{0,r(I,k)}} I st I not in {i,k} + // a(k,k) <- sum{max{0,r(I,k)} I st I not in {i,k} + + DataSet<Tuple4<LongValue, LongValue, DoubleValue, DoubleValue>> availabilities = responsibilities + + // Get the sum of the positive responsibilities and the self responsibility per target + .groupBy("f1").reduceGroup(new availabilityReduceGroup()) + + // Calculate the availability + .leftOuterJoin(responsibilities).where("f0").equalTo("f1").with(new availabilityValue()) + + // Availability damping + .join(messages).where("f0","f1").equalTo("f0","f1").with(new dampedAValue(DAMPING_FACTOR, CONVERGENCE_THRESHOLD)); + + // End iteration + DataSet<Tuple4<LongValue, LongValue, DoubleValue, DoubleValue>> finalMessages = + messages.closeWith(availabilities); + + // Get exemplars + DataSet<Tuple4<LongValue, LongValue, DoubleValue, DoubleValue>> + exemplars = finalMessages.filter(new FilterExemplars()); + + // Get clusters + DataSet<Tuple3<LongValue, LongValue, DoubleValue>> clusters = exemplars + .join(similarities).where("f0").equalTo("f1").projectSecond(0,1,2); + + // Refine clusters assigning exemplars to themselves + DataSet<Tuple3<LongValue, LongValue, DoubleValue>> refinedClusters = clusters + .groupBy("f0").maxBy(2) + .leftOuterJoin(exemplars).where("f0").equalTo("f0").with(new refineClusters()); + + } + + // Init input messages + private static class InitMessage implements MapFunction<Tuple3<LongValue, LongValue, DoubleValue>, + Tuple4<LongValue, LongValue, DoubleValue, DoubleValue>> { + + Tuple4<LongValue, LongValue, DoubleValue, DoubleValue> output = + new Tuple4<>(new LongValue(), new LongValue(), new DoubleValue(), new DoubleValue()); + + @Override + public Tuple4<LongValue, LongValue, DoubleValue, DoubleValue> + map(Tuple3<LongValue, LongValue, DoubleValue> in) { + output.f0.setValue(in.f0.getValue()); --- End diff -- Can change to `output.f0 = in.f0` and above `Tuple4<LongValue, LongValue, DoubleValue, DoubleValue> output = new Tuple4<>(null, null, new DoubleValue(), new DoubleValue());` > Add an Affinity Propagation Library Method > ------------------------------------------ > > Key: FLINK-1707 > URL: https://issues.apache.org/jira/browse/FLINK-1707 > Project: Flink > Issue Type: New Feature > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Josep RubiĆ³ > Priority: Minor > Labels: requires-design-doc > Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf > > > This issue proposes adding the an implementation of the Affinity Propagation > algorithm as a Gelly library method and a corresponding example. > The algorithm is described in paper [1] and a description of a vertex-centric > implementation can be found is [2]. > [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf > [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf > Design doc: > https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing > Example spreadsheet: > https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing > Graph: > https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)