[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276328#comment-15276328
 ] 

ASF GitHub Bot commented on FLINK-2044:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62493213
  
    --- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to 
many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The 
implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be 
provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a 
job to count the vertices.
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm";>HITS 
Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, 
DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    +
    +   private int maxIterations;
    +
    +   /**
    +    * Creates an instance of HITS algorithm.
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    */
    +   public HITSAlgorithm(int maxIterations) {
    +           Preconditions.checkArgument(maxIterations > 0, "The number of 
maximum iteration should be greater than 0.");
    +           this.maxIterations = maxIterations * 2 + 1;
    +   }
    +
    +   @Override
    +   public DataSet<Vertex<K, Tuple2<Double, Double>>> run(Graph<K, VV, EV> 
netGraph) throws Exception {
    +
    +           ScatterGatherConfiguration parameter = new 
ScatterGatherConfiguration();
    +           parameter.setDirection(EdgeDirection.ALL);
    +           parameter.registerAggregator("sumVertexValue", new 
DoubleSumAggregator());
    +
    +           return netGraph
    +                           .mapVertices(new VertexInitMapper<K, VV>())
    +                           .mapEdges(new NullValueEdgeMapper<K, EV>())
    +                           .runScatterGatherIteration(new 
VertexUpdate<K>(maxIterations),
    +                                           new MessageUpdate<K, 
NullValue>(maxIterations), maxIterations, parameter)
    +                           .getVertices();
    +   }
    +
    +   /**
    +    * Function that updates the value of a vertex by summing up the partial
    +    * values from all messages and normalize the value.
    +    */
    +   @SuppressWarnings("serial")
    +   public static final class VertexUpdate<K> extends 
VertexUpdateFunction<K, Tuple2<Double, Double>, Double> {
    +           private int maxIteration;
    +           private DoubleSumAggregator doubleSumAggregator;
    +
    +           public VertexUpdate(int maxIteration) {
    +                   this.maxIteration = maxIteration;
    +           }
    +
    +           @Override
    +           public void preSuperstep() {
    +                   doubleSumAggregator = 
getIterationAggregator("sumVertexValue");
    +           }
    +
    +           @Override
    +           public void updateVertex(Vertex<K, Tuple2<Double, Double>> 
vertex, MessageIterator<Double> inMessages) {
    +                   double updateValue = 0;
    +
    +                   for (double element : inMessages) {
    +                           if (getSuperstepNumber() == maxIteration) {
    +                                   updateValue = element;
    +                                   break;
    +                           }
    +                           updateValue += element;
    +                   }
    +
    +                   // in the first iteration, no aggregation to call, init 
sum with value of vertex
    +                   double iterationValueSum = 1.0;
    +
    +                   if (getSuperstepNumber() > 1) {
    +                           iterationValueSum = Math.sqrt(((DoubleValue) 
getPreviousIterationAggregate("sumVertexValue")).getValue());
    +                   }
    +                   if (getSuperstepNumber() != maxIteration) {
    +                           if (getSuperstepNumber() % 2 == 1) {
    +                                   setNewVertexValue(new Tuple2<Double, 
Double>(vertex.getValue().f0 / iterationValueSum, updateValue));
    +                                   
doubleSumAggregator.aggregate(Math.pow(updateValue, 2));
    +                           } else {
    +                                   setNewVertexValue(new Tuple2<Double, 
Double>(updateValue, vertex.getValue().f1 / iterationValueSum));
    +                                   
doubleSumAggregator.aggregate(Math.pow(updateValue, 2));
    +                           }
    +                   } else {
    +
    +                           //final iteration to normalize hub score
    +                           setNewVertexValue(new Tuple2<Double, 
Double>(vertex.getValue().f0 / iterationValueSum, vertex.getValue().f1));
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Distributes the value of a vertex among all neighbor vertices and 
sum all the
    +    * value in every superstep.
    +    */
    +   @SuppressWarnings("serial")
    +   public static final class MessageUpdate<K, EV> extends 
MessagingFunction<K, Tuple2<Double, Double>, Double, EV> {
    +           private int maxIteration;
    +
    +           public MessageUpdate(int maxIteration) {
    +                   this.maxIteration = maxIteration;
    +           }
    +
    +           @Override
    +           public void sendMessages(Vertex<K, Tuple2<Double, Double>> 
vertex) {
    +
    +                   // in the first iteration, no aggregation to call, init 
sum with value of vertex
    +                   double iterationValueSum = 1.0;
    +
    +                   if (getSuperstepNumber() > 1) {
    +                           iterationValueSum = Math.sqrt(((DoubleValue) 
getPreviousIterationAggregate("sumVertexValue")).getValue());
    +                   }
    +                   for (Edge<K, EV> edge : getEdges()) {
    +                           K messageSource = getSuperstepNumber() % 2 == 1 
? edge.getSource() : edge.getTarget();
    +                           K messageTarget = getSuperstepNumber() % 2 == 1 
? edge.getTarget() : edge.getSource();
    +                           Double messageValue = getSuperstepNumber() % 2 
== 1 ? vertex.getValue().f0 : vertex.getValue().f1;
    +
    +                           if (!messageTarget.equals(vertex.getId())) {
    +                                   if (getSuperstepNumber() != 
maxIteration) {
    +                                           sendMessageTo(messageTarget, 
messageValue / iterationValueSum);
    +
    +                                           // in order to make every 
vertex updated
    +                                           sendMessageTo(messageSource, 
0.0);
    +                                   } else {
    +                                           sendMessageTo(messageSource, 
iterationValueSum);
    +                                   }
    +                           }
    +                   }
    +           }
    +   }
    +
    +   public static class VertexInitMapper<K, VV> implements 
MapFunction<Vertex<K, VV>, Tuple2<Double, Double>> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           public Tuple2<Double, Double> map(Vertex<K, VV> value) {
    +                   
    +                   //init hub and authority value of each vertex
    +                   return new Tuple2<Double, Double>(1.0, 1.0);
    --- End diff --
    
    This object can be created as a member variable so that only a single 
object is instantiated.


> Implementation of Gelly HITS Algorithm
> --------------------------------------
>
>                 Key: FLINK-2044
>                 URL: https://issues.apache.org/jira/browse/FLINK-2044
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>            Reporter: Ahamd Javid
>            Assignee: GaoLun
>            Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to