Repository: flink
Updated Branches:
  refs/heads/master 3ab97ae3d -> 45c3d9b76


[FLINK-4129] [gelly] HITSAlgorithm should test for element-wise convergence

Removes the example HITSAlgorithm. The Gelly library includes the more
performant HITS algorithm.

This closes #2663


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45c3d9b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45c3d9b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45c3d9b7

Branch: refs/heads/master
Commit: 45c3d9b7638a65a14fd5a67d6027b797990f71ee
Parents: 8085aa9
Author: Greg Hogan <[email protected]>
Authored: Wed Oct 19 13:39:15 2016 -0400
Committer: vasia <[email protected]>
Committed: Fri Oct 21 12:33:50 2016 +0200

----------------------------------------------------------------------
 .../flink/graph/examples/HITSAlgorithm.java     | 254 -------------------
 .../flink/graph/examples/data/HITSData.java     |  71 ------
 .../graph/library/HITSAlgorithmITCase.java      | 106 --------
 3 files changed, 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45c3d9b7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
deleted file mode 100644
index ff5a2e9..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.GatherFunction;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.util.Preconditions;
-
-/**
- * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
- * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
- * hubs and authorities. A good hub represents a page that points to many 
other pages, and a good authority
- * represented a page that is linked by many different hubs.
- * Each vertex has a value of Tuple2 type, the first field is hub score and 
the second field is authority score.
- * The implementation sets same score to every vertex and adds the reverse 
edge to every edge at the beginning. 
- * <p>
- *
- * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm";>HITS 
Algorithm</a>
- */
-public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, 
DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {
-
-       private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
-       private final static double MINIMUMTHRESHOLD = Double.MIN_VALUE;
-
-       private int maxIterations;
-       private double convergeThreshold;
-
-       /**
-        * Create an instance of HITS algorithm.
-        *
-        * @param maxIterations the maximum number of iterations
-        */
-       public HITSAlgorithm(int maxIterations) {
-               this(maxIterations, MINIMUMTHRESHOLD);
-       }
-
-       /**
-        * Create an instance of HITS algorithm.
-        *
-        * @param convergeThreshold convergence threshold for sum of scores to 
control whether the iteration should be stopped
-        */
-       public HITSAlgorithm(double convergeThreshold) {
-               this(MAXIMUMITERATION, convergeThreshold);
-       }
-
-       /**
-        * Creates an instance of HITS algorithm.
-        *
-        * @param maxIterations     the maximum number of iterations
-        * @param convergeThreshold convergence threshold for sum of scores to 
control whether the iteration should be stopped
-        */
-       public HITSAlgorithm(int maxIterations, double convergeThreshold) {
-               Preconditions.checkArgument(maxIterations > 0, "Number of 
iterations must be greater than zero.");
-               Preconditions.checkArgument(convergeThreshold > 0.0, 
"Convergence threshold must be greater than zero.");
-               this.maxIterations = maxIterations * 2 + 1;
-               this.convergeThreshold = convergeThreshold;
-       }
-
-       @Override
-       public DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>> 
run(Graph<K, VV, EV> graph) throws Exception {
-               Graph<K, Tuple2<DoubleValue, DoubleValue>, Boolean> newGraph = 
graph
-                               .mapEdges(new AuthorityEdgeMapper<K, EV>())
-                               .union(graph.reverse().mapEdges(new 
HubEdgeMapper<K, EV>()))
-                               .mapVertices(new VertexInitMapper<K, VV>());
-
-               ScatterGatherConfiguration parameter = new 
ScatterGatherConfiguration();
-               parameter.setDirection(EdgeDirection.OUT);
-               parameter.setOptNumVertices(true);
-               parameter.registerAggregator("updatedValueSum", new 
DoubleSumAggregator());
-               parameter.registerAggregator("authorityValueSum", new 
DoubleSumAggregator());
-               parameter.registerAggregator("diffValueSum", new 
DoubleSumAggregator());
-
-               return newGraph
-                               .runScatterGatherIteration(new 
MessageUpdate<K>(maxIterations),
-                                               new 
VertexUpdate<K>(maxIterations, convergeThreshold), maxIterations, parameter)
-                               .getVertices();
-       }
-
-       /**
-        * Distributes the value of a vertex among all neighbor vertices and 
sum all the
-        * value in every superstep.
-        */
-       private static final class MessageUpdate<K> extends ScatterFunction<K, 
Tuple2<DoubleValue, DoubleValue>, Double, Boolean> {
-               private int maxIteration;
-
-               public MessageUpdate(int maxIteration) {
-                       this.maxIteration = maxIteration;
-               }
-
-               @Override
-               public void sendMessages(Vertex<K, Tuple2<DoubleValue, 
DoubleValue>> vertex) {
-                       // in the first iteration, no aggregation to call, init 
sum with value of vertex
-                       double iterationValueSum = 1.0;
-
-                       if (getSuperstepNumber() > 1) {
-                               iterationValueSum = Math.sqrt(((DoubleValue) 
getPreviousIterationAggregate("updatedValueSum")).getValue());
-                       }
-                       for (Edge<K, Boolean> edge : getEdges()) {
-                               if (getSuperstepNumber() != maxIteration) {
-                                       if (getSuperstepNumber() % 2 == 1) {
-                                               if (edge.getValue()) {
-                                                       
sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / 
iterationValueSum);
-                                               }
-                                       } else {
-                                               if (!edge.getValue()) {
-                                                       
sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / 
iterationValueSum);
-                                               }
-                                       }
-                               } else {
-                                       if (!edge.getValue()) {
-                                               sendMessageTo(edge.getTarget(), 
iterationValueSum);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Function that updates the value of a vertex by summing up the partial
-        * values from all messages and normalize the value.
-        */
-       private static final class VertexUpdate<K> extends GatherFunction<K, 
Tuple2<DoubleValue, DoubleValue>, Double> {
-               private int maxIteration;
-               private double convergeThreshold;
-               private DoubleSumAggregator updatedValueSumAggregator;
-               private DoubleSumAggregator authoritySumAggregator;
-               private DoubleSumAggregator diffSumAggregator;
-
-               public VertexUpdate(int maxIteration, double convergeThreshold) 
{
-                       this.maxIteration = maxIteration;
-                       this.convergeThreshold = convergeThreshold;
-               }
-
-               @Override
-               public void preSuperstep() {
-                       updatedValueSumAggregator = 
getIterationAggregator("updatedValueSum");
-                       authoritySumAggregator = 
getIterationAggregator("authorityValueSum");
-                       diffSumAggregator = 
getIterationAggregator("diffValueSum");
-               }
-
-               @Override
-               public void updateVertex(Vertex<K, Tuple2<DoubleValue, 
DoubleValue>> vertex, MessageIterator<Double> inMessages) {
-                       double updateValue = 0;
-
-                       for (double element : inMessages) {
-                               if (getSuperstepNumber() == maxIteration) {
-                                       updateValue = element;
-                                       break;
-                               }
-                               updateValue += element;
-                       }
-                       
updatedValueSumAggregator.aggregate(Math.pow(updateValue, 2));
-
-                       // in the first iteration, no aggregation to call, init 
sum with value of vertex
-                       double iterationValueSum = 1.0;
-
-                       DoubleValue newHubValue = vertex.getValue().f0;
-                       DoubleValue newAuthorityValue = vertex.getValue().f1;
-
-                       if (getSuperstepNumber() > 1) {
-                               iterationValueSum = Math.sqrt(((DoubleValue) 
getPreviousIterationAggregate("updatedValueSum")).getValue());
-                       }
-                       if (getSuperstepNumber() < maxIteration) {
-                               if (getSuperstepNumber() % 2 == 1) {
-
-                                       //in the first iteration, the diff is 
the authority value of each vertex
-                                       double previousAuthAverage = 1.0;
-                                       double diffValueSum = 1.0 * 
getNumberOfVertices();
-                                       if (getSuperstepNumber() > 1) {
-                                               previousAuthAverage = 
((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / 
getNumberOfVertices();
-                                               diffValueSum = ((DoubleValue) 
getPreviousIterationAggregate("diffValueSum")).getValue();
-                                       }
-                                       
authoritySumAggregator.aggregate(previousAuthAverage);
-
-                                       if (diffValueSum > convergeThreshold) {
-                                               
newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
-                                               
newAuthorityValue.setValue(updateValue);
-                                       } else {
-
-                                               //scores are converged and stop 
iteration
-                                               maxIteration = 
getSuperstepNumber();
-                                               
newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
-                                       }
-                               } else {
-                                       newHubValue.setValue(updateValue);
-                                       
newAuthorityValue.setValue(newAuthorityValue.getValue() / iterationValueSum);
-                                       
authoritySumAggregator.aggregate(newAuthorityValue.getValue());
-                                       double previousAuthAverage = 
((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / 
getNumberOfVertices();
-
-                                       // count the diff value of sum of 
authority scores
-                                       
diffSumAggregator.aggregate(previousAuthAverage - newAuthorityValue.getValue());
-                               }
-                               setNewVertexValue(new Tuple2<>(newHubValue, 
newAuthorityValue));
-                       } else if (getSuperstepNumber() == maxIteration) {
-
-                               //final iteration to normalize hub score
-                               newHubValue.setValue(newHubValue.getValue() / 
iterationValueSum);
-                               setNewVertexValue(new Tuple2<>(newHubValue, 
newAuthorityValue));
-                       }
-               }
-       }
-
-       private static class VertexInitMapper<K, VV> implements 
MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> {
-               private Tuple2<DoubleValue, DoubleValue> initVertexValue = new 
Tuple2<>(new DoubleValue(1.0), new DoubleValue(1.0));
-
-               public Tuple2<DoubleValue, DoubleValue> map(Vertex<K, VV> 
value) {
-                       //init hub and authority value of each vertex
-                       return initVertexValue;
-               }
-       }
-
-       private static class AuthorityEdgeMapper<K, EV> implements 
MapFunction<Edge<K, EV>, Boolean> {
-               public Boolean map(Edge<K, EV> edge) {
-                       // mark edge as true for authority updating
-                       return true;
-               }
-       }
-
-       private static class HubEdgeMapper<K, EV> implements 
MapFunction<Edge<K, EV>, Boolean> {
-               public Boolean map(Edge<K, EV> edge) {
-                       // mark edge as false for hub updating
-                       return false;
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/45c3d9b7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java
deleted file mode 100644
index 7f40856..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples.data;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the data set used for the HITS test program.
- */
-public class HITSData {
-
-       public static final String VALUE_AFTER_10_ITERATIONS = 
"1,0.70710678,3.12608866E-8\n" +
-                                                                               
                                        "2,1.29486832E-8,0.70710678\n" +
-                                                                               
                                        "3,1.29486832E-8,0.49999999\n" +
-                                                                               
                                        "4,0.50000001,0.49999999\n" +
-                                                                               
                                        "5,0.50000001,3.12608866E-8\n";
-
-
-       private HITSData() {}
-
-       public static final DataSet<Vertex<Long, Double>> 
getVertexDataSet(ExecutionEnvironment env) {
-
-               List<Vertex<Long, Double>> vertices = new ArrayList<>();
-               vertices.add(new Vertex<>(1L, 1.0));
-               vertices.add(new Vertex<>(2L, 2.0));
-               vertices.add(new Vertex<>(3L, 3.0));
-               vertices.add(new Vertex<>(4L, 4.0));
-               vertices.add(new Vertex<>(5L, 5.0));
-       
-               return env.fromCollection(vertices);
-       }       
-       
-       public static final DataSet<Edge<Long, NullValue>> 
getEdgeDataSet(ExecutionEnvironment env) {
-               
-               List<Edge<Long, NullValue>> edges = new ArrayList<>();
-               edges.add(new Edge<>(2L, 1L, NullValue.getInstance()));
-               edges.add(new Edge<>(5L, 2L, NullValue.getInstance()));
-               edges.add(new Edge<>(5L, 4L, NullValue.getInstance()));
-               edges.add(new Edge<>(4L, 3L, NullValue.getInstance()));
-               edges.add(new Edge<>(4L, 2L, NullValue.getInstance()));
-               edges.add(new Edge<>(1L, 4L, NullValue.getInstance()));
-               edges.add(new Edge<>(1L, 2L, NullValue.getInstance()));
-               edges.add(new Edge<>(1L, 3L, NullValue.getInstance()));
-               edges.add(new Edge<>(3L, 5L, NullValue.getInstance()));
-               
-               return env.fromCollection(edges);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/45c3d9b7/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
deleted file mode 100644
index b2526ae..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.examples.HITSAlgorithm;
-import org.apache.flink.graph.examples.data.HITSData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
-       
-       public HITSAlgorithmITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testHITSWithTenIterations() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, NullValue> graph = Graph.fromDataSet(
-                               HITSData.getVertexDataSet(env),
-                               HITSData.getEdgeDataSet(env),
-                               env);
-               
-               List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = 
graph.run(new HITSAlgorithm<Long, Double, NullValue>(10)).collect();
-
-               compareWithDelta(result, 1e-7);
-       }
-
-       @Test
-       public void testHITSWithConvergeThreshold() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, NullValue> graph = Graph.fromDataSet(
-                               HITSData.getVertexDataSet(env),
-                               HITSData.getEdgeDataSet(env),
-                               env);
-
-               List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = 
graph.run(new HITSAlgorithm<Long, Double, NullValue>(1e-7)).collect();
-
-               compareWithDelta(result, 1e-7);
-       }
-
-       private void compareWithDelta(List<Vertex<Long, Tuple2<DoubleValue, 
DoubleValue>>> result, double delta) {
-
-               String resultString = "";
-               for (Vertex<Long, Tuple2<DoubleValue, DoubleValue>> v : result) 
{
-                       resultString += v.f0.toString() + "," + 
v.f1.f0.toString() + "," + v.f1.f1.toString() +"\n";
-               }
-
-               String expectedResult = HITSData.VALUE_AFTER_10_ITERATIONS;
-               
-               String[] expected = expectedResult.isEmpty() ? new String[0] : 
expectedResult.split("\n");
-
-               String[] resultArray = resultString.isEmpty() ? new String[0] : 
resultString.split("\n");
-
-               Arrays.sort(expected);
-               Arrays.sort(resultArray);
-
-               for (int i = 0; i < expected.length; i++) {
-                       String[] expectedFields = expected[i].split(",");
-                       String[] resultFields = resultArray[i].split(",");
-
-                       double expectedHub = 
Double.parseDouble(expectedFields[1]);
-                       double resultHub = Double.parseDouble(resultFields[1]);
-                       
-                       double expectedAuthority = 
Double.parseDouble(expectedFields[2]);
-                       double resultAuthority = 
Double.parseDouble(resultFields[2]);
-
-                       Assert.assertTrue("Values differ by more than the 
permissible delta",
-                                       Math.abs(expectedHub - resultHub) < 
delta);
-
-                       Assert.assertTrue("Values differ by more than the 
permissible delta",
-                                       Math.abs(expectedAuthority - 
resultAuthority) < delta);
-               }
-       }
-}

Reply via email to