Repository: flink Updated Branches: refs/heads/master 3ab97ae3d -> 45c3d9b76
[FLINK-4129] [gelly] HITSAlgorithm should test for element-wise convergence Removes the example HITSAlgorithm. The Gelly library includes the more performant HITS algorithm. This closes #2663 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45c3d9b7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45c3d9b7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45c3d9b7 Branch: refs/heads/master Commit: 45c3d9b7638a65a14fd5a67d6027b797990f71ee Parents: 8085aa9 Author: Greg Hogan <[email protected]> Authored: Wed Oct 19 13:39:15 2016 -0400 Committer: vasia <[email protected]> Committed: Fri Oct 21 12:33:50 2016 +0200 ---------------------------------------------------------------------- .../flink/graph/examples/HITSAlgorithm.java | 254 ------------------- .../flink/graph/examples/data/HITSData.java | 71 ------ .../graph/library/HITSAlgorithmITCase.java | 106 -------- 3 files changed, 431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/45c3d9b7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java deleted file mode 100644 index ff5a2e9..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.flink.api.common.aggregators.DoubleSumAggregator; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.GatherFunction; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.ScatterFunction; -import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.util.Preconditions; - -/** - * This is an implementation of HITS algorithm, using a scatter-gather iteration. - * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, - * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority - * represented a page that is linked by many different hubs. - * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score. - * The implementation sets same score to every vertex and adds the reverse edge to every edge at the beginning. - * <p> - * - * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a> - */ -public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> { - - private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2; - private final static double MINIMUMTHRESHOLD = Double.MIN_VALUE; - - private int maxIterations; - private double convergeThreshold; - - /** - * Create an instance of HITS algorithm. - * - * @param maxIterations the maximum number of iterations - */ - public HITSAlgorithm(int maxIterations) { - this(maxIterations, MINIMUMTHRESHOLD); - } - - /** - * Create an instance of HITS algorithm. - * - * @param convergeThreshold convergence threshold for sum of scores to control whether the iteration should be stopped - */ - public HITSAlgorithm(double convergeThreshold) { - this(MAXIMUMITERATION, convergeThreshold); - } - - /** - * Creates an instance of HITS algorithm. - * - * @param maxIterations the maximum number of iterations - * @param convergeThreshold convergence threshold for sum of scores to control whether the iteration should be stopped - */ - public HITSAlgorithm(int maxIterations, double convergeThreshold) { - Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero."); - Preconditions.checkArgument(convergeThreshold > 0.0, "Convergence threshold must be greater than zero."); - this.maxIterations = maxIterations * 2 + 1; - this.convergeThreshold = convergeThreshold; - } - - @Override - public DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>> run(Graph<K, VV, EV> graph) throws Exception { - Graph<K, Tuple2<DoubleValue, DoubleValue>, Boolean> newGraph = graph - .mapEdges(new AuthorityEdgeMapper<K, EV>()) - .union(graph.reverse().mapEdges(new HubEdgeMapper<K, EV>())) - .mapVertices(new VertexInitMapper<K, VV>()); - - ScatterGatherConfiguration parameter = new ScatterGatherConfiguration(); - parameter.setDirection(EdgeDirection.OUT); - parameter.setOptNumVertices(true); - parameter.registerAggregator("updatedValueSum", new DoubleSumAggregator()); - parameter.registerAggregator("authorityValueSum", new DoubleSumAggregator()); - parameter.registerAggregator("diffValueSum", new DoubleSumAggregator()); - - return newGraph - .runScatterGatherIteration(new MessageUpdate<K>(maxIterations), - new VertexUpdate<K>(maxIterations, convergeThreshold), maxIterations, parameter) - .getVertices(); - } - - /** - * Distributes the value of a vertex among all neighbor vertices and sum all the - * value in every superstep. - */ - private static final class MessageUpdate<K> extends ScatterFunction<K, Tuple2<DoubleValue, DoubleValue>, Double, Boolean> { - private int maxIteration; - - public MessageUpdate(int maxIteration) { - this.maxIteration = maxIteration; - } - - @Override - public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) { - // in the first iteration, no aggregation to call, init sum with value of vertex - double iterationValueSum = 1.0; - - if (getSuperstepNumber() > 1) { - iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue()); - } - for (Edge<K, Boolean> edge : getEdges()) { - if (getSuperstepNumber() != maxIteration) { - if (getSuperstepNumber() % 2 == 1) { - if (edge.getValue()) { - sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum); - } - } else { - if (!edge.getValue()) { - sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum); - } - } - } else { - if (!edge.getValue()) { - sendMessageTo(edge.getTarget(), iterationValueSum); - } - } - } - } - } - - /** - * Function that updates the value of a vertex by summing up the partial - * values from all messages and normalize the value. - */ - private static final class VertexUpdate<K> extends GatherFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> { - private int maxIteration; - private double convergeThreshold; - private DoubleSumAggregator updatedValueSumAggregator; - private DoubleSumAggregator authoritySumAggregator; - private DoubleSumAggregator diffSumAggregator; - - public VertexUpdate(int maxIteration, double convergeThreshold) { - this.maxIteration = maxIteration; - this.convergeThreshold = convergeThreshold; - } - - @Override - public void preSuperstep() { - updatedValueSumAggregator = getIterationAggregator("updatedValueSum"); - authoritySumAggregator = getIterationAggregator("authorityValueSum"); - diffSumAggregator = getIterationAggregator("diffValueSum"); - } - - @Override - public void updateVertex(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex, MessageIterator<Double> inMessages) { - double updateValue = 0; - - for (double element : inMessages) { - if (getSuperstepNumber() == maxIteration) { - updateValue = element; - break; - } - updateValue += element; - } - updatedValueSumAggregator.aggregate(Math.pow(updateValue, 2)); - - // in the first iteration, no aggregation to call, init sum with value of vertex - double iterationValueSum = 1.0; - - DoubleValue newHubValue = vertex.getValue().f0; - DoubleValue newAuthorityValue = vertex.getValue().f1; - - if (getSuperstepNumber() > 1) { - iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue()); - } - if (getSuperstepNumber() < maxIteration) { - if (getSuperstepNumber() % 2 == 1) { - - //in the first iteration, the diff is the authority value of each vertex - double previousAuthAverage = 1.0; - double diffValueSum = 1.0 * getNumberOfVertices(); - if (getSuperstepNumber() > 1) { - previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices(); - diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue(); - } - authoritySumAggregator.aggregate(previousAuthAverage); - - if (diffValueSum > convergeThreshold) { - newHubValue.setValue(newHubValue.getValue() / iterationValueSum); - newAuthorityValue.setValue(updateValue); - } else { - - //scores are converged and stop iteration - maxIteration = getSuperstepNumber(); - newHubValue.setValue(newHubValue.getValue() / iterationValueSum); - } - } else { - newHubValue.setValue(updateValue); - newAuthorityValue.setValue(newAuthorityValue.getValue() / iterationValueSum); - authoritySumAggregator.aggregate(newAuthorityValue.getValue()); - double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices(); - - // count the diff value of sum of authority scores - diffSumAggregator.aggregate(previousAuthAverage - newAuthorityValue.getValue()); - } - setNewVertexValue(new Tuple2<>(newHubValue, newAuthorityValue)); - } else if (getSuperstepNumber() == maxIteration) { - - //final iteration to normalize hub score - newHubValue.setValue(newHubValue.getValue() / iterationValueSum); - setNewVertexValue(new Tuple2<>(newHubValue, newAuthorityValue)); - } - } - } - - private static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> { - private Tuple2<DoubleValue, DoubleValue> initVertexValue = new Tuple2<>(new DoubleValue(1.0), new DoubleValue(1.0)); - - public Tuple2<DoubleValue, DoubleValue> map(Vertex<K, VV> value) { - //init hub and authority value of each vertex - return initVertexValue; - } - } - - private static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> { - public Boolean map(Edge<K, EV> edge) { - // mark edge as true for authority updating - return true; - } - } - - private static class HubEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> { - public Boolean map(Edge<K, EV> edge) { - // mark edge as false for hub updating - return false; - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/45c3d9b7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java deleted file mode 100644 index 7f40856..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples.data; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.NullValue; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the data set used for the HITS test program. - */ -public class HITSData { - - public static final String VALUE_AFTER_10_ITERATIONS = "1,0.70710678,3.12608866E-8\n" + - "2,1.29486832E-8,0.70710678\n" + - "3,1.29486832E-8,0.49999999\n" + - "4,0.50000001,0.49999999\n" + - "5,0.50000001,3.12608866E-8\n"; - - - private HITSData() {} - - public static final DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) { - - List<Vertex<Long, Double>> vertices = new ArrayList<>(); - vertices.add(new Vertex<>(1L, 1.0)); - vertices.add(new Vertex<>(2L, 2.0)); - vertices.add(new Vertex<>(3L, 3.0)); - vertices.add(new Vertex<>(4L, 4.0)); - vertices.add(new Vertex<>(5L, 5.0)); - - return env.fromCollection(vertices); - } - - public static final DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, NullValue>> edges = new ArrayList<>(); - edges.add(new Edge<>(2L, 1L, NullValue.getInstance())); - edges.add(new Edge<>(5L, 2L, NullValue.getInstance())); - edges.add(new Edge<>(5L, 4L, NullValue.getInstance())); - edges.add(new Edge<>(4L, 3L, NullValue.getInstance())); - edges.add(new Edge<>(4L, 2L, NullValue.getInstance())); - edges.add(new Edge<>(1L, 4L, NullValue.getInstance())); - edges.add(new Edge<>(1L, 2L, NullValue.getInstance())); - edges.add(new Edge<>(1L, 3L, NullValue.getInstance())); - edges.add(new Edge<>(3L, 5L, NullValue.getInstance())); - - return env.fromCollection(edges); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/45c3d9b7/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java deleted file mode 100644 index b2526ae..0000000 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.examples.HITSAlgorithm; -import org.apache.flink.graph.examples.data.HITSData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.NullValue; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(Parameterized.class) -public class HITSAlgorithmITCase extends MultipleProgramsTestBase{ - - public HITSAlgorithmITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testHITSWithTenIterations() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, NullValue> graph = Graph.fromDataSet( - HITSData.getVertexDataSet(env), - HITSData.getEdgeDataSet(env), - env); - - List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = graph.run(new HITSAlgorithm<Long, Double, NullValue>(10)).collect(); - - compareWithDelta(result, 1e-7); - } - - @Test - public void testHITSWithConvergeThreshold() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, NullValue> graph = Graph.fromDataSet( - HITSData.getVertexDataSet(env), - HITSData.getEdgeDataSet(env), - env); - - List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = graph.run(new HITSAlgorithm<Long, Double, NullValue>(1e-7)).collect(); - - compareWithDelta(result, 1e-7); - } - - private void compareWithDelta(List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result, double delta) { - - String resultString = ""; - for (Vertex<Long, Tuple2<DoubleValue, DoubleValue>> v : result) { - resultString += v.f0.toString() + "," + v.f1.f0.toString() + "," + v.f1.f1.toString() +"\n"; - } - - String expectedResult = HITSData.VALUE_AFTER_10_ITERATIONS; - - String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n"); - - String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n"); - - Arrays.sort(expected); - Arrays.sort(resultArray); - - for (int i = 0; i < expected.length; i++) { - String[] expectedFields = expected[i].split(","); - String[] resultFields = resultArray[i].split(","); - - double expectedHub = Double.parseDouble(expectedFields[1]); - double resultHub = Double.parseDouble(resultFields[1]); - - double expectedAuthority = Double.parseDouble(expectedFields[2]); - double resultAuthority = Double.parseDouble(resultFields[2]); - - Assert.assertTrue("Values differ by more than the permissible delta", - Math.abs(expectedHub - resultHub) < delta); - - Assert.assertTrue("Values differ by more than the permissible delta", - Math.abs(expectedAuthority - resultAuthority) < delta); - } - } -}
