http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java new file mode 100644 index 0000000..2b4277d --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.EuclideanGraphWeighing; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data sets used for the Euclidean Graph example program. + * If no parameters are given to the program, the default data sets are used. + */ +public class EuclideanGraphData { + + public static final int NUM_VERTICES = 9; + + public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" + + "6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0"; + + public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) { + + List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>(); + for(int i=1; i<=NUM_VERTICES; i++) { + vertices.add(new Vertex<Long, EuclideanGraphWeighing.Point>(new Long(i), + new EuclideanGraphWeighing.Point(new Double(i), new Double(i)))); + } + + return env.fromCollection(vertices); + } + + public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + "2,4\n" + "2,5\n" + + "3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + "6,7\n" + "6,8\n" + + "7,8\n" + "7,9\n" + "8,9"; + + public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, 0.0)); + edges.add(new Edge<Long, Double>(1L, 4L, 0.0)); + edges.add(new Edge<Long, Double>(2L, 3L, 0.0)); + edges.add(new Edge<Long, Double>(2L, 4L, 0.0)); + edges.add(new Edge<Long, Double>(2L, 5L, 0.0)); + edges.add(new Edge<Long, Double>(3L, 5L, 0.0)); + edges.add(new Edge<Long, Double>(4L, 5L, 0.0)); + edges.add(new Edge<Long, Double>(4L, 6L, 0.0)); + edges.add(new Edge<Long, Double>(5L, 7L, 0.0)); + edges.add(new Edge<Long, Double>(5L, 9L, 0.0)); + edges.add(new Edge<Long, Double>(6L, 7L, 0.0)); + edges.add(new Edge<Long, Double>(6L, 8L, 0.0)); + edges.add(new Edge<Long, Double>(6L, 8L, 0.0)); + edges.add(new Edge<Long, Double>(7L, 8L, 0.0)); + edges.add(new Edge<Long, Double>(7L, 9L, 0.0)); + edges.add(new Edge<Long, Double>(8L, 9L, 0.0)); + + return env.fromCollection(edges); + } + + public static final String RESULTED_WEIGHTED_EDGES = "1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" + + "2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" + "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" + + "4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" + "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" + + "6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" + "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" + + "8,9,1.4142135623730951"; + + private EuclideanGraphData() {} +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java new file mode 100644 index 0000000..99e363a --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data sets used for the IncrementalSSSP example program. + * If no parameters are given to the program, the default data sets are used. + */ +public class IncrementalSSSPData { + + public static final int NUM_VERTICES = 5; + + public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5,0.0"; + + public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) { + + List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>(); + vertices.add(new Vertex<Long, Double>(1L, 6.0)); + vertices.add(new Vertex<Long, Double>(2L, 2.0)); + vertices.add(new Vertex<Long, Double>(3L, 3.0)); + vertices.add(new Vertex<Long, Double>(4L, 1.0)); + vertices.add(new Vertex<Long, Double>(5L, 0.0)); + + return env.fromCollection(vertices); + } + + public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" + + "4,5,1.0"; + + public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 3L, 3.0)); + edges.add(new Edge<Long, Double>(2L, 4L, 3.0)); + edges.add(new Edge<Long, Double>(2L, 5L, 2.0)); + edges.add(new Edge<Long, Double>(3L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(3L, 5L, 5.0)); + edges.add(new Edge<Long, Double>(4L, 5L, 1.0)); + + return env.fromCollection(edges); + } + + public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "4,5,1.0"; + + public static final DataSet<Edge<Long, Double>> getDefaultEdgesInSSSP(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 3L, 3.0)); + edges.add(new Edge<Long, Double>(2L, 5L, 2.0)); + edges.add(new Edge<Long, Double>(3L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(4L, 5L, 1.0)); + + return env.fromCollection(edges); + } + + public static final String SRC_EDGE_TO_BE_REMOVED = "2"; + + public static final String TRG_EDGE_TO_BE_REMOVED = "5"; + + public static final String VAL_EDGE_TO_BE_REMOVED = "2.0"; + + public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() { + + return new Edge<Long, Double>(2L, 5L, 2.0); + } + + public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE + "\n" + "2," + Double.MAX_VALUE+ "\n" + + "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0"; + + private IncrementalSSSPData() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java new file mode 100644 index 0000000..054f041 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data sets used for the Jaccard Similarity Measure example program. + * If no parameters are given to the program, the default data sets are used. + */ +public class JaccardSimilarityMeasureData { + + public static final String EDGES = "1 2\n" + "1 3\n" + "1 4\n" + "1 5\n" + "2 3\n" + "2 4\n" + + "2 5\n" + "3 4\n" + "3 5\n" + "4 5"; + + public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, new Double(0))); + edges.add(new Edge<Long, Double>(1L, 3L, new Double(0))); + edges.add(new Edge<Long, Double>(1L, 4L, new Double(0))); + edges.add(new Edge<Long, Double>(1L, 5L, new Double(0))); + edges.add(new Edge<Long, Double>(2L, 3L, new Double(0))); + edges.add(new Edge<Long, Double>(2L, 4L, new Double(0))); + edges.add(new Edge<Long, Double>(2L, 5L, new Double(0))); + edges.add(new Edge<Long, Double>(3L, 4L, new Double(0))); + edges.add(new Edge<Long, Double>(3L, 5L, new Double(0))); + edges.add(new Edge<Long, Double>(4L, 5L, new Double(0))); + + return env.fromCollection(edges); + } + + public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" + + "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6"; + + private JaccardSimilarityMeasureData() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java new file mode 100644 index 0000000..8decb24 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; + +/** + * Provides the default data set used for the Label Propagation test program. + * If no parameters are given to the program, the default edge data set is used. + */ +public class LabelPropagationData { + + public static final String LABELS_AFTER_1_ITERATION = "1,10\n" + + "2,10\n" + + "3,10\n" + + "4,40\n" + + "5,40\n" + + "6,40\n" + + "7,40\n"; + + public static final String LABELS_WITH_TIE ="1,10\n" + + "2,10\n" + + "3,10\n" + + "4,10\n" + + "5,20\n" + + "6,20\n" + + "7,20\n" + + "8,20\n" + + "9,20\n"; + + private LabelPropagationData() {} + + public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) { + + List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); + vertices.add(new Vertex<Long, Long>(1l, 10l)); + vertices.add(new Vertex<Long, Long>(2l, 10l)); + vertices.add(new Vertex<Long, Long>(3l, 30l)); + vertices.add(new Vertex<Long, Long>(4l, 40l)); + vertices.add(new Vertex<Long, Long>(5l, 40l)); + vertices.add(new Vertex<Long, Long>(6l, 40l)); + vertices.add(new Vertex<Long, Long>(7l, 40l)); + + return env.fromCollection(vertices); + } + + public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); + edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance())); + + return env.fromCollection(edges); + } + + public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) { + + List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); + vertices.add(new Vertex<Long, Long>(1l, 10l)); + vertices.add(new Vertex<Long, Long>(2l, 10l)); + vertices.add(new Vertex<Long, Long>(3l, 10l)); + vertices.add(new Vertex<Long, Long>(4l, 10l)); + vertices.add(new Vertex<Long, Long>(5l, 0l)); + vertices.add(new Vertex<Long, Long>(6l, 20l)); + vertices.add(new Vertex<Long, Long>(7l, 20l)); + vertices.add(new Vertex<Long, Long>(8l, 20l)); + vertices.add(new Vertex<Long, Long>(9l, 20l)); + + return env.fromCollection(vertices); + } + + public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); + edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance())); + + return env.fromCollection(edges); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java new file mode 100644 index 0000000..e4c98fe --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; + +/** + * Provides the default data sets used for the Music Profiles example program. + * If no parameters are given to the program, the default data sets are used. + */ +public class MusicProfilesData { + + public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) { + List<Tuple3<String, String, Integer>> triplets = new ArrayList<Tuple3<String, String, Integer>>(); + + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_1", 100)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_2", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_3", 20)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_4", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_5", 1)); + + triplets.add(new Tuple3<String, String, Integer>("user_2", "song_6", 40)); + triplets.add(new Tuple3<String, String, Integer>("user_2", "song_7", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_2", "song_8", 3)); + + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_1", 100)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_2", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_3", 20)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_8", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_9", 1)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_10", 8)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_11", 90)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_12", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_13", 34)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_14", 17)); + + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_1", 100)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_6", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_8", 20)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_12", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_13", 1)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_15", 1)); + + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_3", 300)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_4", 4)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_5", 5)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_8", 8)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_9", 9)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_10", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_12", 12)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_13", 13)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_15", 15)); + + triplets.add(new Tuple3<String, String, Integer>("user_6", "song_6", 30)); + + return env.fromCollection(triplets); + } + + public static DataSet<String> getMismatches(ExecutionEnvironment env) { + List<String> errors = new ArrayList<String>(); + errors.add("ERROR: <song_8 track_8> Sever"); + errors.add("ERROR: <song_15 track_15> Black Trees"); + return env.fromCollection(errors); + } + + public static final String USER_SONG_TRIPLETS = "user_1 song_1 100\n" + "user_1 song_5 200\n" + + "user_2 song_1 10\n" + "user_2 song_4 20\n" + + "user_3 song_2 3\n" + + "user_4 song_2 1\n" + "user_4 song_3 2\n" + + "user_5 song_3 30"; + + public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie"; + + public static final String MAX_ITERATIONS = "2"; + + public static final String TOP_SONGS_RESULT = "user_1 song_1\n" + + "user_2 song_4\n" + + "user_3 song_2\n" + + "user_4 song_3\n" + + "user_5 song_3"; + + public static final String COMMUNITIES_RESULT = "user_1 1\n" + + "user_2 1\n" + + "user_3 3\n" + + "user_4 3\n" + + "user_5 4"; +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java new file mode 100644 index 0000000..a45de88 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; + +/** + * Provides the default data set used for the PageRank test program. + * If no parameters are given to the program, the default edge data set is used. + */ +public class PageRankData { + + public static final String EDGES = "2 1\n" + + "5 2\n" + + "5 4\n" + + "4 3\n" + + "4 2\n" + + "1 4\n" + + "1 2\n" + + "1 3\n" + + "3 5\n"; + + + public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" + + "2,0.248\n" + + "3,0.173\n" + + "4,0.175\n" + + "5,0.165\n"; + + private PageRankData() {} + + public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(2L, 1L, 1.0)); + edges.add(new Edge<Long, Double>(5L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(5L, 4L, 1.0)); + edges.add(new Edge<Long, Double>(4L, 3L, 1.0)); + edges.add(new Edge<Long, Double>(4L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 4L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 3L, 1.0)); + edges.add(new Edge<Long, Double>(3L, 5L, 1.0)); + + return env.fromCollection(edges); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java new file mode 100644 index 0000000..75b4484 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; + +/** + * Provides the default data set used for the Single Source Shortest Paths example program. + * If no parameters are given to the program, the default edge data set is used. + */ +public class SingleSourceShortestPathsData { + + public static final Long SRC_VERTEX_ID = 1L; + + public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" + + "4\t5\t45.0\n" + "5\t1\t51.0"; + + public static final Object[][] DEFAULT_EDGES = new Object[][] { + new Object[]{1L, 2L, 12.0}, + new Object[]{1L, 3L, 13.0}, + new Object[]{2L, 3L, 23.0}, + new Object[]{3L, 4L, 34.0}, + new Object[]{3L, 5L, 35.0}, + new Object[]{4L, 5L, 45.0}, + new Object[]{5L, 1L, 51.0} + }; + + public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "1,0.0\n" + "2,12.0\n" + "3,13.0\n" + + "4,47.0\n" + "5,48.0"; + + public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>(); + for (Object[] edge : DEFAULT_EDGES) { + edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2])); + } + return env.fromCollection(edgeList); + } + + private SingleSourceShortestPathsData() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java new file mode 100644 index 0000000..c14d5de --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import com.google.common.collect.Lists; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; + +import java.util.List; + +/** + * Provides the default data set used for Summarization tests. + */ +public class SummarizationData { + + private SummarizationData() {} + + /** + * The resulting vertex id can be any id of the vertices summarized by the single vertex. + * + * Format: + * + * "possible-id[,possible-id];group-value,group-count" + */ + public static final String[] EXPECTED_VERTICES = new String[] { + "0,1;A,2", + "2,3,4;B,3", + "5;C,1" + }; + + /** + * Format: + * + * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count" + */ + public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] { + "0,1;0,1;A,2", + "0,1;2,3,4;A,1", + "2,3,4;0,1;A,1", + "2,3,4;0,1;C,2", + "2,3,4;2,3,4;B,2", + "5;2,3,4;D,2" + }; + + /** + * Format: + * + * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count" + */ + public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new String[] { + "0,1;0,1;(null),2", + "0,1;2,3,4;(null),1", + "2,3,4;0,1;(null),3", + "2,3,4;2,3,4;(null),2", + "5;2,3,4;(null),2" + }; + + /** + * Creates a set of vertices with attached {@link String} values. + * + * @param env execution environment + * @return vertex data set with string values + */ + public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment env) { + List<Vertex<Long, String>> vertices = Lists.newArrayListWithExpectedSize(6); + vertices.add(new Vertex<>(0L, "A")); + vertices.add(new Vertex<>(1L, "A")); + vertices.add(new Vertex<>(2L, "B")); + vertices.add(new Vertex<>(3L, "B")); + vertices.add(new Vertex<>(4L, "B")); + vertices.add(new Vertex<>(5L, "C")); + + return env.fromCollection(vertices); + } + + /** + * Creates a set of edges with attached {@link String} values. + * + * @param env execution environment + * @return edge data set with string values + */ + public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env) { + List<Edge<Long, String>> edges = Lists.newArrayListWithExpectedSize(10); + edges.add(new Edge<>(0L, 1L, "A")); + edges.add(new Edge<>(1L, 0L, "A")); + edges.add(new Edge<>(1L, 2L, "A")); + edges.add(new Edge<>(2L, 1L, "A")); + edges.add(new Edge<>(2L, 3L, "B")); + edges.add(new Edge<>(3L, 2L, "B")); + edges.add(new Edge<>(4L, 0L, "C")); + edges.add(new Edge<>(4L, 1L, "C")); + edges.add(new Edge<>(5L, 2L, "D")); + edges.add(new Edge<>(5L, 3L, "D")); + + return env.fromCollection(edges); + } + + /** + * Creates a set of edges with {@link NullValue} as edge value. + * + * @param env execution environment + * @return edge data set with null values + */ + @SuppressWarnings("serial") + public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment env) { + return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>() { + @Override + public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception { + return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java new file mode 100644 index 0000000..71b874c --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.types.NullValue; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data sets used for the Triangle Count test program. + * If no parameters are given to the program, the default data sets are used. + */ +public class TriangleCountData { + + public static final String EDGES = "1 2\n"+"1 3\n"+"2 3\n"+"2 6\n"+"3 4\n"+"3 5\n"+"3 6\n"+"4 5\n"+"6 7\n"; + + public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); + edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance())); + edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance())); + + return env.fromCollection(edges); + } + + public static final String RESULTED_NUMBER_OF_TRIANGLES = "3"; + + public static List<Tuple3<Long,Long,Long>> getListOfTriangles() { + ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3); + ret.add(new Tuple3<>(1L,2L,3L)); + ret.add(new Tuple3<>(2L,3L,6L)); + ret.add(new Tuple3<>(3L,4L,5L)); + return ret; + } + + private TriangleCountData () {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java new file mode 100644 index 0000000..b1bc831 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.utils; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +public class ExampleUtils { + + @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) + public static void printResult(DataSet set, String msg) { + set.output(new PrintingOutputFormatWithMessage(msg) { + }); + } + + public static class PrintingOutputFormatWithMessage<T> implements + OutputFormat<T> { + + private static final long serialVersionUID = 1L; + + private transient PrintStream stream; + + private transient String prefix; + + private String message; + + // -------------------------------------------------------------------------------------------- + + /** + * Instantiates a printing output format that prints to standard out. + */ + public PrintingOutputFormatWithMessage() { + } + + public PrintingOutputFormatWithMessage(String msg) { + this.message = msg; + } + + @Override + public void open(int taskNumber, int numTasks) { + // get the target stream + this.stream = System.out; + + // set the prefix to message + this.prefix = message + ": "; + } + + @Override + public void writeRecord(T record) { + if (this.prefix != null) { + this.stream.println(this.prefix + record.toString()); + } else { + this.stream.println(record.toString()); + } + } + + @Override + public void close() { + this.stream = null; + this.prefix = null; + } + + @Override + public String toString() { + return "Print to System.out"; + } + + @Override + public void configure(Configuration parameters) { + } + } + + @SuppressWarnings("serial") + public static DataSet<Vertex<Long, NullValue>> getVertexIds( + ExecutionEnvironment env, final long numVertices) { + return env.generateSequence(1, numVertices).map( + new MapFunction<Long, Vertex<Long, NullValue>>() { + public Vertex<Long, NullValue> map(Long l) { + return new Vertex<Long, NullValue>(l, NullValue + .getInstance()); + } + }); + } + + @SuppressWarnings("serial") + public static DataSet<Edge<Long, NullValue>> getRandomEdges( + ExecutionEnvironment env, final long numVertices) { + return env.generateSequence(1, numVertices).flatMap( + new FlatMapFunction<Long, Edge<Long, NullValue>>() { + @Override + public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception { + int numOutEdges = (int) (Math.random() * (numVertices / 2)); + for (int i = 0; i < numOutEdges; i++) { + long target = (long) (Math.random() * numVertices) + 1; + out.collect(new Edge<Long, NullValue>(key, target, + NullValue.getInstance())); + } + } + }); + } + + public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData( + ExecutionEnvironment env) { + List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>(); + vertices.add(new Vertex<Long, Double>(1L, 1.0)); + vertices.add(new Vertex<Long, Double>(2L, 2.0)); + vertices.add(new Vertex<Long, Double>(3L, 3.0)); + vertices.add(new Vertex<Long, Double>(4L, 4.0)); + vertices.add(new Vertex<Long, Double>(5L, 5.0)); + + return env.fromCollection(vertices); + } + + public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData( + ExecutionEnvironment env) { + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, 12.0)); + edges.add(new Edge<Long, Double>(1L, 3L, 13.0)); + edges.add(new Edge<Long, Double>(2L, 3L, 23.0)); + edges.add(new Edge<Long, Double>(3L, 4L, 34.0)); + edges.add(new Edge<Long, Double>(3L, 5L, 35.0)); + edges.add(new Edge<Long, Double>(4L, 5L, 45.0)); + edges.add(new Edge<Long, Double>(5L, 1L, 51.0)); + + return env.fromCollection(edges); + } + + /** + * Private constructor to prevent instantiation. + */ + private ExampleUtils() { + throw new RuntimeException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala new file mode 100644 index 0000000..704d476 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.examples + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.library.GSAConnectedComponents +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData +import org.apache.flink.types.NullValue +import org.apache.flink.api.common.functions.MapFunction +import java.lang.Long + +/** + * This example shows how to use Gelly's library methods. + * You can find all available library methods in [[org.apache.flink.graph.library]]. + * + * In particular, this example uses the + * [[GSAConnectedComponents]] + * library method to compute the connected components of the input graph. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\n1\t3\n</code> defines two edges, + * 1-2 and 1-3. + * + * Usage {{ + * ConnectedComponents <edge path> <result path> <number of iterations> + * }} + * If no parameters are provided, the program is run with default data from + * [[ConnectedComponentsDefaultData]] + */ +object ConnectedComponents { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env) + + val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations)) + + + // emit result + if (fileOutput) { + components.writeAsCsv(outputPath, "\n", ",") + env.execute("Connected Components Example") + } else { + components.print() + } + } + + private final class InitVertices extends MapFunction[Long, Long] { + override def map(id: Long) = id + } + + // *********************************************************************** + // UTIL METHODS + // *********************************************************************** + + private var fileOutput = false + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>") + } + fileOutput = true + edgesInputPath = args(0) + outputPath = args(1) + maxIterations = 2 + } else { + System.out.println("Executing ConnectedComponents example with default parameters" + + " and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>") + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) + } else { + val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) + } + env.fromCollection(edgeData).map( + edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala new file mode 100644 index 0000000..0a10ad7 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.examples + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction} +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData + +/** + * This example shows how to use Gelly's gather-sum-apply iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * [[SingleSourceShortestPathsData]] + */ +object GSASingleSourceShortestPaths { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) + + // Execute the gather-sum-apply iteration + val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, + new UpdateDistance, maxIterations) + + // Extract the vertices as the result + val singleSourceShortestPaths = result.getVertices + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") + env.execute("GSA Single Source Shortest Paths Example") + } else { + singleSourceShortestPaths.print() + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { + + override def map(id: Long) = { + if (id.equals(srcId)) { + 0.0 + } else { + Double.PositiveInfinity + } + } + } + + private final class CalculateDistances extends GatherFunction[Double, Double, Double] { + override def gather(neighbor: Neighbor[Double, Double]) = { + neighbor.getNeighborValue + neighbor.getEdgeValue + } + } + + private final class ChooseMinDistance extends SumFunction[Double, Double, Double] { + override def sum(newValue: Double, currentValue: Double) = { + Math.min(newValue, currentValue) + } + } + + private final class UpdateDistance extends ApplyFunction[Long, Double, Double] { + override def apply(newDistance: Double, oldDistance: Double) = { + if (newDistance < oldDistance) { + setResult(newDistance) + } + } + } + + // ************************************************************************** + // UTIL METHODS + // ************************************************************************** + + private var fileOutput = false + private var srcVertexId = 1L + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = 5 + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + } + fileOutput = true + srcVertexId = args(0).toLong + edgesInputPath = args(1) + outputPath = args(2) + maxIterations = 3 + } else { + System.out.println("Executing Single Source Shortest Paths example " + + "with default parameters and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long, Double)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(new Tuple3ToEdgeMap[Long, Double]()) + } else { + val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map { + case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long], + z.asInstanceOf[Double]) + } + env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]()) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala new file mode 100644 index 0000000..f9fa82d --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.examples + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.util.Collector + +/** + * This example illustrates how to use Gelly metrics methods and get simple statistics + * from the input graph. + * + * The program creates a random graph and computes and prints + * the following metrics: + * - number of vertices + * - number of edges + * - average node degree + * - the vertex ids with the max/min in- and out-degrees + * + * The input file is expected to contain one edge per line, + * with long IDs and no values, in the following format: + * {{{ + * <sourceVertexID>\t<targetVertexID> + * }}} + * If no arguments are provided, the example runs with a random graph of 100 vertices. + * + */ +object GraphMetrics { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + /** create the graph **/ + val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env) + + /** get the number of vertices **/ + val numVertices = graph.numberOfVertices + + /** get the number of edges **/ + val numEdges = graph.numberOfEdges + + /** compute the average node degree **/ + val verticesWithDegrees = graph.getDegrees + val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) + + /** find the vertex with the maximum in-degree **/ + val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) + + /** find the vertex with the minimum in-degree **/ + val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1) + + /** find the vertex with the maximum out-degree **/ + val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1) + + /** find the vertex with the minimum out-degree **/ + val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1) + + /** print the results **/ + env.fromElements(numVertices).printOnTaskManager("Total number of vertices") + env.fromElements(numEdges).printOnTaskManager("Total number of edges") + avgDegree.printOnTaskManager("Average node degree") + maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") + minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") + maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") + minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") + + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 1) { + edgesPath = args(0) + true + } else { + System.err.println("Usage: GraphMetrics <edges path>") + false + } + } else { + System.out.println("Executing GraphMetrics example with built-in default data.") + System.out.println(" Provide parameters to read input data from a file.") + System.out.println(" Usage: GraphMetrics <edges path>") + true + } + } + + private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = "\t").map( + in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) + } else { + env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]]( + (key: Long, out: Collector[Edge[Long, NullValue]]) => { + val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt + for ( i <- 0 to numOutEdges ) { + val target: Long = ((Math.random() * numVertices) + 1).toLong + new Edge[Long, NullValue](key, target, NullValue.getInstance()) + } + }) + } + } + + private var fileOutput: Boolean = false + private var edgesPath: String = null + private var outputPath: String = null + private val numVertices = 100 +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala new file mode 100644 index 0000000..4f84bb0 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.examples + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.spargel.VertexUpdateFunction +import org.apache.flink.graph.spargel.MessageIterator +import org.apache.flink.graph.Vertex +import org.apache.flink.graph.spargel.MessagingFunction +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData +import scala.collection.JavaConversions._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap + +/** + * This example shows how to use Gelly's scatter-gather iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * [[SingleSourceShortestPathsData]] + */ +object SingleSourceShortestPaths { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) + + // Execute the scatter-gather iteration + val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, + new MinDistanceMessenger, maxIterations) + + // Extract the vertices as the result + val singleSourceShortestPaths = result.getVertices + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") + env.execute("Single Source Shortest Paths Example") + } else { + singleSourceShortestPaths.print() + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { + + override def map(id: Long) = { + if (id.equals(srcId)) { + 0.0 + } else { + Double.PositiveInfinity + } + } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + */ + private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] { + + override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) { + var minDistance = Double.MaxValue + while (inMessages.hasNext) { + val msg = inMessages.next + if (msg < minDistance) { + minDistance = msg + } + } + if (vertex.getValue > minDistance) { + setNewVertexValue(minDistance) + } + } + } + + /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + */ + private final class MinDistanceMessenger extends + MessagingFunction[Long, Double, Double, Double] { + + override def sendMessages(vertex: Vertex[Long, Double]) { + if (vertex.getValue < Double.PositiveInfinity) { + for (edge: Edge[Long, Double] <- getEdges) { + sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) + } + } + } + } + + // **************************************************************************** + // UTIL METHODS + // **************************************************************************** + + private var fileOutput = false + private var srcVertexId = 1L + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = 5 + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + } + fileOutput = true + srcVertexId = args(0).toLong + edgesInputPath = args(1) + outputPath = args(2) + maxIterations = 3 + } else { + System.out.println("Executing Single Source Shortest Paths example " + + "with default parameters and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long, Double)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(new Tuple3ToEdgeMap[Long, Double]()) + } else { + val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map { + case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long], + z.asInstanceOf[Double]) + } + env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]()) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java new file mode 100644 index 0000000..cd8af9b --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.CommunityDetectionData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class CommunityDetectionITCase extends MultipleProgramsTestBase { + + public CommunityDetectionITCase(TestExecutionMode mode) { + super(mode); + } + + private String expected; + + @Test + public void testSingleIteration() throws Exception { + /* + * Test one iteration of the Simple Community Detection Example + */ + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Double> inputGraph = Graph.fromDataSet( + CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env); + + List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA)) + .getVertices().collect(); + + expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION; + compareResultAsTuples(result, expected); + } + + @Test + public void testTieBreaker() throws Exception { + /* + * Test one iteration of the Simple Community Detection Example where a tie must be broken + */ + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Double> inputGraph = Graph.fromDataSet( + CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env); + + List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA)) + .getVertices().collect(); + expected = CommunityDetectionData.COMMUNITIES_WITH_TIE; + compareResultAsTuples(result, expected); + } + + @SuppressWarnings("serial") + private static final class InitLabels implements MapFunction<Long, Long>{ + + public Long map(Long id) { + return id; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java new file mode 100644 index 0000000..8b9234b --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.LabelPropagationData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class LabelPropagationITCase extends MultipleProgramsTestBase { + + public LabelPropagationITCase(TestExecutionMode mode){ + super(mode); + } + + private String expectedResult; + + @Test + public void testSingleIteration() throws Exception { + /* + * Test one iteration of label propagation example with a simple graph + */ + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet( + LabelPropagationData.getDefaultVertexSet(env), + LabelPropagationData.getDefaultEdgeDataSet(env), env); + + List<Vertex<Long, Long>> result = inputGraph + .run(new LabelPropagation<Long, Long, NullValue>(1)) + .collect(); + + expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION; + compareResultAsTuples(result, expectedResult); + } + + @Test + public void testTieBreaker() throws Exception { + /* + * Test the label propagation example where a tie must be broken + */ + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet( + LabelPropagationData.getTieVertexSet(env), + LabelPropagationData.getTieEdgeDataSet(env), env); + + List<Vertex<Long, Long>> result = inputGraph + .run(new LabelPropagation<Long, Long, NullValue>(1)) + .collect(); + + expectedResult = LabelPropagationData.LABELS_WITH_TIE; + compareResultAsTuples(result, expectedResult); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java new file mode 100644 index 0000000..034bcd5 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.PageRankData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class PageRankITCase extends MultipleProgramsTestBase { + + public PageRankITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testPageRankWithThreeIterations() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + @Test + public void testGSAPageRankWithThreeIterations() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + @Test + public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + @Test + public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + private void compareWithDelta(List<Vertex<Long, Double>> result, + double delta) { + + String resultString = ""; + for (Vertex<Long, Double> v : result) { + resultString += v.f0.toString() + "," + v.f1.toString() +"\n"; + } + + String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS; + String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n"); + + String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n"); + + Arrays.sort(expected); + Arrays.sort(resultArray); + + for (int i = 0; i < expected.length; i++) { + String[] expectedFields = expected[i].split(","); + String[] resultFields = resultArray[i].split(","); + + double expectedPayLoad = Double.parseDouble(expectedFields[1]); + double resultPayLoad = Double.parseDouble(resultFields[1]); + + Assert.assertTrue("Values differ by more than the permissible delta", + Math.abs(expectedPayLoad - resultPayLoad) < delta); + } + } + + @SuppressWarnings("serial") + private static final class InitMapper implements MapFunction<Long, Double> { + public Double map(Long value) { + return 1.0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java new file mode 100644 index 0000000..17ddcfa --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import com.google.common.collect.Lists; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.SummarizationData; +import org.apache.flink.graph.library.Summarization.EdgeValue; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class SummarizationITCase extends MultipleProgramsTestBase { + + private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";"); + + private static final Pattern ID_SEPARATOR = Pattern.compile(","); + + public SummarizationITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testWithVertexAndEdgeValues() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, String, String> input = Graph.fromDataSet( + SummarizationData.getVertices(env), + SummarizationData.getEdges(env), + env + ); + + List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList(); + List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList(); + + Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output = + input.run(new Summarization<Long, String, String>()); + + output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices)); + output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges)); + + env.execute(); + + validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices); + validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges); + } + + @Test + public void testWithVertexAndAbsentEdgeValues() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, String, NullValue> input = Graph.fromDataSet( + SummarizationData.getVertices(env), + SummarizationData.getEdgesWithAbsentValues(env), + env + ); + + List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList(); + List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList(); + + Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output = + input.run(new Summarization<Long, String, NullValue>()); + + output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices)); + output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges)); + + env.execute(); + + validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices); + validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges); + } + + private void validateVertices(String[] expectedVertices, + List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) { + Arrays.sort(expectedVertices); + Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() { + @Override + public int compare(Vertex<Long, Summarization.VertexValue<String>> o1, + Vertex<Long, Summarization.VertexValue<String>> o2) { + int result = o1.getId().compareTo(o2.getId()); + if (result == 0) { + result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); + } + if (result == 0) { + result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); + } + if (result == 0) { + result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); + } + return result; + } + }); + + for (int i = 0; i < expectedVertices.length; i++) { + validateVertex(expectedVertices[i], actualVertices.get(i)); + } + } + + private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges, + List<Edge<Long, EdgeValue<EV>>> actualEdges) { + Arrays.sort(expectedEdges); + Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () { + + @Override + public int compare(Edge<Long, EdgeValue<EV>> o1, Edge<Long, EdgeValue<EV>> o2) { + int result = o1.getSource().compareTo(o2.getSource()); + if (result == 0) { + result = o1.getTarget().compareTo(o2.getTarget()); + } + if (result == 0) { + result = o1.getTarget().compareTo(o2.getTarget()); + } + if (result == 0) { + result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue()); + } + if (result == 0) { + result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount()); + } + return result; + } + }); + + for (int i = 0; i < expectedEdges.length; i++) { + validateEdge(expectedEdges[i], actualEdges.get(i)); + } + } + + private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) { + String[] tokens = TOKEN_SEPARATOR.split(expected); + assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId())); + assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue()); + assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount()); + } + + private <EV> void validateEdge(String expected, Edge<Long, EdgeValue<EV>> actual) { + String[] tokens = TOKEN_SEPARATOR.split(expected); + assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource())); + assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget())); + assertEquals(getGroupValue(tokens[2]), actual.getValue().getEdgeGroupValue().toString()); + assertEquals(getGroupCount(tokens[2]), actual.getValue().getEdgeGroupCount()); + } + + private List<Long> getListFromIdRange(String idRange) { + List<Long> result = Lists.newArrayList(); + for (String id : ID_SEPARATOR.split(idRange)) { + result.add(Long.parseLong(id)); + } + return result; + } + + private String getGroupValue(String token) { + return ID_SEPARATOR.split(token)[0]; + } + + private Long getGroupCount(String token) { + return Long.valueOf(ID_SEPARATOR.split(token)[1]); + } +}
