http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java deleted file mode 100644 index 9e00760..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.IncrementalSSSPData; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.graph.spargel.VertexUpdateFunction; - -/** - * This example illustrates how to - * <ul> - * <li> create a Graph directly from CSV files - * <li> use the scatter-gather iteration's messaging direction configuration option - * </ul> - * - * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated - * upon edge removal. - * - * The program takes as input the resulted graph after a SSSP computation, - * an edge to be removed and the initial graph(i.e. before SSSP was computed). - * In the following description, SP-graph is used as an abbreviation for - * the graph resulted from the SSSP computation. We denote the edges that belong to this - * graph by SP-edges. - * - * - If the removed edge does not belong to the SP-graph, no computation is necessary. - * The edge is simply removed from the graph. - * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, - * potentially require re-computation. - * When the edge {@code <u, v>} is removed, v checks if it has another out-going SP-edge. - * If yes, no further computation is required. - * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. - * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. - * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. - * If not, it invalidates its current value and propagates the INVALIDATE message. - * The propagation stops when a vertex with an alternative shortest path is reached - * or when we reach a vertex with no SP-in-neighbors. - * - * Usage <code>IncrementalSSSP <vertex path> <edge path> <edges in SSSP> - * <src id edge to be removed> <trg id edge to be removed> <val edge to be removed> - * <result path> <number of iterations></code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} - */ -@SuppressWarnings("serial") -public class IncrementalSSSP implements ProgramDescription { - - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved(); - - Graph<Long, Double, Double> graph = IncrementalSSSP.getGraph(env); - - // Assumption: all minimum weight paths are kept - Graph<Long, Double, Double> ssspGraph = IncrementalSSSP.getSSSPGraph(env); - - // remove the edge - graph.removeEdge(edgeToBeRemoved); - - // configure the iteration - ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - - if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) { - - parameters.setDirection(EdgeDirection.IN); - parameters.setOptDegrees(true); - - // run the scatter-gather iteration to propagate info - Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(), - new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters); - - DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); - - // Emit results - if(fileOutput) { - resultedVertices.writeAsCsv(outputPath, "\n", ","); - env.execute("Incremental SSSP Example"); - } else { - resultedVertices.print(); - } - } else { - // print the vertices - if(fileOutput) { - graph.getVertices().writeAsCsv(outputPath, "\n", ","); - env.execute("Incremental SSSP Example"); - } else { - graph.getVertices().print(); - } - } - } - - @Override - public String getDescription() { - return "Incremental Single Sink Shortest Paths Example"; - } - - // ****************************************************************************************************************** - // IncrementalSSSP METHODS - // ****************************************************************************************************************** - - /** - * Function that verifies whether the edge to be removed is part of the SSSP or not. - * If it is, the src vertex will be invalidated. - * - * @param edgeToBeRemoved - * @param edgesInSSSP - * @return true or false - */ - public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception { - - return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() { - @Override - public boolean filter(Edge<Long, Double> edge) throws Exception { - return edge.equals(edgeToBeRemoved); - } - }).count() > 0; - } - - public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { - - @Override - public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception { - if (inMessages.hasNext()) { - Long outDegree = getOutDegree() - 1; - // check if the vertex has another SP-Edge - if (outDegree <= 0) { - // set own value to infinity - setNewVertexValue(Double.MAX_VALUE); - } - } - } - } - - public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> { - - private Edge<Long, Double> edgeToBeRemoved; - - public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) { - this.edgeToBeRemoved = edgeToBeRemoved; - } - - @Override - public void sendMessages(Vertex<Long, Double> vertex) throws Exception { - - - if(getSuperstepNumber() == 1) { - if(vertex.getId().equals(edgeToBeRemoved.getSource())) { - // activate the edge target - sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE); - } - } - - if(getSuperstepNumber() > 1) { - // invalidate all edges - for(Edge<Long, Double> edge : getEdges()) { - sendMessageTo(edge.getSource(), Double.MAX_VALUE); - } - } - } - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static String verticesInputPath = null; - - private static String edgesInputPath = null; - - private static String edgesInSSSPInputPath = null; - - private static Long srcEdgeToBeRemoved = null; - - private static Long trgEdgeToBeRemoved = null; - - private static Double valEdgeToBeRemoved = null; - - private static String outputPath = null; - - private static int maxIterations = 5; - - private static boolean parseParameters(String[] args) { - if (args.length > 0) { - if (args.length == 8) { - fileOutput = true; - verticesInputPath = args[0]; - edgesInputPath = args[1]; - edgesInSSSPInputPath = args[2]; - srcEdgeToBeRemoved = Long.parseLong(args[3]); - trgEdgeToBeRemoved = Long.parseLong(args[4]); - valEdgeToBeRemoved = Double.parseDouble(args[5]); - outputPath = args[6]; - maxIterations = Integer.parseInt(args[7]); - } else { - System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("See the documentation for the correct format of input files."); - System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " + - "<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " + - "<output path> <max iterations>"); - - return false; - } - } - return true; - } - - private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) { - if(fileOutput) { - return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n") - .types(Long.class, Double.class, Double.class); - } else { - return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), env); - } - } - - private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment env) { - if(fileOutput) { - return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n") - .types(Long.class, Double.class, Double.class); - } else { - return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), env); - } - } - - private static Edge<Long, Double> getEdgeToBeRemoved() { - if (fileOutput) { - return new Edge<Long, Double>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved); - } else { - return IncrementalSSSPData.getDefaultEdgeToBeRemoved(); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java deleted file mode 100644 index 5fb75e2..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.ReduceNeighborsFunction; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.VertexJoinFunction; -import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; - -import java.util.HashSet; - -/** - * This example shows how to use - * <ul> - * <li> neighborhood methods - * <li> join with vertices - * <li> triplets - * </ul> - * - * Given a directed, unweighted graph, return a weighted graph where the edge values are equal - * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size - * of the union of neighbor sets - for the src and target vertices. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <br> - * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. - * Edges themselves are separated by newlines. - * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3. - * </p> - * - * Usage <code> JaccardSimilarityMeasure <edge path> <result path></code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} - */ -@SuppressWarnings("serial") -public class JaccardSimilarityMeasure implements ProgramDescription { - - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); - - Graph<Long, HashSet<Long>, Double> graph = Graph.fromDataSet(edges, - new MapFunction<Long, HashSet<Long>>() { - - @Override - public HashSet<Long> map(Long id) throws Exception { - HashSet<Long> neighbors = new HashSet<Long>(); - neighbors.add(id); - - return new HashSet<Long>(neighbors); - } - }, env); - - // create the set of neighbors - DataSet<Tuple2<Long, HashSet<Long>>> computedNeighbors = - graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); - - // join with the vertices to update the node values - Graph<Long, HashSet<Long>, Double> graphWithVertexValues = - graph.joinWithVertices(computedNeighbors, new VertexJoinFunction<HashSet<Long>, - HashSet<Long>>() { - - public HashSet<Long> vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) { - return inputValue; - } - }); - - // compare neighbors, compute Jaccard - DataSet<Edge<Long, Double>> edgesWithJaccardValues = - graphWithVertexValues.getTriplets().map(new ComputeJaccard()); - - // emit result - if (fileOutput) { - edgesWithJaccardValues.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Executing Jaccard Similarity Measure"); - } else { - edgesWithJaccardValues.print(); - } - - } - - @Override - public String getDescription() { - return "Vertex Jaccard Similarity Measure"; - } - - /** - * Each vertex will have a HashSet containing its neighbor ids as value. - */ - private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> { - - @Override - public HashSet<Long> reduceNeighbors(HashSet<Long> first, HashSet<Long> second) { - first.addAll(second); - return new HashSet<Long>(first); - } - } - - /** - * The edge weight will be the Jaccard coefficient, which is computed as follows: - * - * Consider the edge x-y - * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively. - * sizeX+sizeY = union + intersection of neighborhoods - * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods - * The intersection can then be deduced. - * - * The Jaccard similarity coefficient is then, the intersection/union. - */ - private static final class ComputeJaccard implements - MapFunction<Triplet<Long, HashSet<Long>, Double>, Edge<Long, Double>> { - - @Override - public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) throws Exception { - - Vertex<Long, HashSet<Long>> srcVertex = triplet.getSrcVertex(); - Vertex<Long, HashSet<Long>> trgVertex = triplet.getTrgVertex(); - - Long x = srcVertex.getId(); - Long y = trgVertex.getId(); - HashSet<Long> neighborSetY = trgVertex.getValue(); - - double unionPlusIntersection = srcVertex.getValue().size() + neighborSetY.size(); - // within a HashSet, all elements are distinct - HashSet<Long> unionSet = new HashSet<Long>(); - unionSet.addAll(srcVertex.getValue()); - unionSet.addAll(neighborSetY); - double union = unionSet.size(); - double intersection = unionPlusIntersection - union; - - return new Edge<Long, Double>(x, y, intersection/union); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - - private static boolean parseParameters(String [] args) { - if(args.length > 0) { - if(args.length != 2) { - System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>"); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - } else { - System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>"); - } - - return true; - } - - private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { - @Override - public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { - return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0)); - } - }); - } else { - return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java deleted file mode 100644 index b2857d0..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.EdgesFunctionWithVertexValue; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.VertexJoinFunction; -import org.apache.flink.graph.example.utils.MusicProfilesData; -import org.apache.flink.graph.library.LabelPropagation; -import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; - -/** - * This example demonstrates how to mix the DataSet Flink API with the Gelly API. - * The input is a set <userId - songId - playCount> triplets and - * a set of bad records, i.e. song ids that should not be trusted. - * Initially, we use the DataSet API to filter out the bad records. - * Then, we use Gelly to create a user -> song weighted bipartite graph and compute - * the top song (most listened) per user. - * Then, we use the DataSet API again, to create a user-user similarity graph, - * based on common songs, where users that are listeners of the same song - * are connected. A user-defined threshold on the playcount value - * defines when a user is considered to be a listener of a song. - * Finally, we use the graph API to run the label propagation community detection algorithm on - * the similarity graph. - * - * The triplets input is expected to be given as one triplet per line, - * in the following format: "<userID>\t<songID>\t<playcount>". - * - * The mismatches input file is expected to contain one mismatch record per line, - * in the following format: - * "ERROR: <songID trackID> song_title" - * - * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}. - */ -@SuppressWarnings("serial") -public class MusicProfiles implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - /** - * Read the user-song-play triplets. - */ - DataSet<Tuple3<String, String, Integer>> triplets = getUserSongTripletsData(env); - - /** - * Read the mismatches dataset and extract the songIDs - */ - DataSet<Tuple1<String>> mismatches = getMismatchesData(env).map(new ExtractMismatchSongIds()); - - /** - * Filter out the mismatches from the triplets dataset - */ - DataSet<Tuple3<String, String, Integer>> validTriplets = triplets - .coGroup(mismatches).where(1).equalTo(0) - .with(new FilterOutMismatches()); - - /** - * Create a user -> song weighted bipartite graph where the edge weights - * correspond to play counts - */ - Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env); - - /** - * Get the top track (most listened) for each user - */ - DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph - .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT) - .filter(new FilterSongNodes()); - - if (fileOutput) { - usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", "\t"); - } else { - usersWithTopTrack.print(); - } - - /** - * Create a user-user similarity graph, based on common songs, i.e. two - * users that listen to the same song are connected. For each song, we - * create an edge between each pair of its in-neighbors. - */ - DataSet<Edge<String, NullValue>> similarUsers = userSongGraph - .getEdges() - // filter out user-song edges that are below the playcount threshold - .filter(new FilterFunction<Edge<String, Integer>>() { - public boolean filter(Edge<String, Integer> edge) { - return (edge.getValue() > playcountThreshold); - } - }).groupBy(1) - .reduceGroup(new CreateSimilarUserEdges()).distinct(); - - Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers, - new MapFunction<String, Long>() { - public Long map(String value) { - return 1l; - } - }, env).getUndirected(); - - /** - * Detect user communities using the label propagation library method - */ - // Initialize each vertex with a unique numeric label and run the label propagation algorithm - DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils - .zipWithUniqueId(similarUsersGraph.getVertexIds()) - .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() { - @Override - public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception { - return new Tuple2<String, Long>(tuple2.f1, tuple2.f0); - } - }); - - DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph - .joinWithVertices(idsWithInitialLabels, - new VertexJoinFunction<Long, Long>() { - public Long vertexJoin(Long vertexValue, Long inputValue) { - return inputValue; - } - }).run(new LabelPropagation<String, Long, NullValue>(maxIterations)); - - if (fileOutput) { - verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t"); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute(); - } else { - verticesWithCommunity.print(); - } - - } - - public static final class ExtractMismatchSongIds implements MapFunction<String, Tuple1<String>> { - - public Tuple1<String> map(String value) { - String[] tokens = value.split("\\s+"); - String songId = tokens[1].substring(1); - return new Tuple1<String>(songId); - } - } - - public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>, - Tuple1<String>, Tuple3<String, String, Integer>> { - - public void coGroup(Iterable<Tuple3<String, String, Integer>> triplets, - Iterable<Tuple1<String>> invalidSongs, Collector<Tuple3<String, String, Integer>> out) { - - if (!invalidSongs.iterator().hasNext()) { - // this is a valid triplet - for (Tuple3<String, String, Integer> triplet : triplets) { - out.collect(triplet); - } - } - } - } - - public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> { - public boolean filter(Tuple2<String, String> value) throws Exception { - return !value.f1.equals(""); - } - } - - public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue<String, NullValue, Integer, - Tuple2<String, String>> { - - public void iterateEdges(Vertex<String, NullValue> vertex, - Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception { - - int maxPlaycount = 0; - String topSong = ""; - for (Edge<String, Integer> edge : edges) { - if (edge.getValue() > maxPlaycount) { - maxPlaycount = edge.getValue(); - topSong = edge.getTarget(); - } - } - out.collect(new Tuple2<String, String>(vertex.getId(), topSong)); - } - } - - public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>, - Edge<String, NullValue>> { - - public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String, NullValue>> out) { - List<String> listeners = new ArrayList<String>(); - for (Edge<String, Integer> edge : edges) { - listeners.add(edge.getSource()); - } - for (int i = 0; i < listeners.size() - 1; i++) { - for (int j = i + 1; j < listeners.size(); j++) { - out.collect(new Edge<String, NullValue>(listeners.get(i), - listeners.get(j), NullValue.getInstance())); - } - } - } - } - - @Override - public String getDescription() { - return "Music Profiles Example"; - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static String userSongTripletsInputPath = null; - - private static String mismatchesInputPath = null; - - private static String topTracksOutputPath = null; - - private static int playcountThreshold = 0; - - private static String communitiesOutputPath = null; - - private static int maxIterations = 10; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length != 6) { - System.err.println("Usage: MusicProfiles <input user song triplets path>" + - " <input song mismatches path> <output top tracks path> " - + "<playcount threshold> <output communities path> <num iterations>"); - return false; - } - - fileOutput = true; - userSongTripletsInputPath = args[0]; - mismatchesInputPath = args[1]; - topTracksOutputPath = args[2]; - playcountThreshold = Integer.parseInt(args[3]); - communitiesOutputPath = args[4]; - maxIterations = Integer.parseInt(args[5]); - } else { - System.out.println("Executing Music Profiles example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: MusicProfiles <input user song triplets path>" + - " <input song mismatches path> <output top tracks path> " - + "<playcount threshold> <output communities path> <num iterations>"); - } - return true; - } - - private static DataSet<Tuple3<String, String, Integer>> getUserSongTripletsData(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(userSongTripletsInputPath) - .lineDelimiter("\n").fieldDelimiter("\t") - .types(String.class, String.class, Integer.class); - } else { - return MusicProfilesData.getUserSongTriplets(env); - } - } - - private static DataSet<String> getMismatchesData(ExecutionEnvironment env) { - if (fileOutput) { - return env.readTextFile(mismatchesInputPath); - } else { - return MusicProfilesData.getMismatches(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java deleted file mode 100644 index ba84e80..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; - -/** - * This example shows how to use Gelly's scatter-gather iterations. - * - * It is an implementation of the Single-Source-Shortest-Paths algorithm. - * For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, - * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. - * - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData} - */ -public class SingleSourceShortestPaths implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); - - Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); - - // Execute the scatter-gather iteration - Graph<Long, Double, Double> result = graph.runScatterGatherIteration( - new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations); - - // Extract the vertices as the result - DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); - - // emit result - if (fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Single Source Shortest Paths Example"); - } else { - singleSourceShortestPaths.print(); - } - - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path UDFs - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction<Long, Double>{ - - private long srcId; - - public InitVertices(long srcId) { - this.srcId = srcId; - } - - public Double map(Long id) { - if (id.equals(srcId)) { - return 0.0; - } - else { - return Double.POSITIVE_INFINITY; - } - } - } - - /** - * Function that updates the value of a vertex by picking the minimum - * distance from all incoming messages. - */ - @SuppressWarnings("serial") - public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { - - @Override - public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) { - - Double minDistance = Double.MAX_VALUE; - - for (double msg : inMessages) { - if (msg < minDistance) { - minDistance = msg; - } - } - - if (vertex.getValue() > minDistance) { - setNewVertexValue(minDistance); - } - } - } - - /** - * Distributes the minimum distance associated with a given vertex among all - * the target vertices summed up with the edge's value. - */ - @SuppressWarnings("serial") - public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> { - - @Override - public void sendMessages(Vertex<Long, Double> vertex) { - if (vertex.getValue() < Double.POSITIVE_INFINITY) { - for (Edge<Long, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); - } - } - } - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static Long srcVertexId = 1l; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static int maxIterations = 5; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - return false; - } - - fileOutput = true; - srcVertexId = Long.parseLong(args[0]); - edgesInputPath = args[1]; - outputPath = args[2]; - maxIterations = Integer.parseInt(args[3]); - } else { - System.out.println("Executing Single Source Shortest Paths example " - + "with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - } - return true; - } - - private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n") - .fieldDelimiter("\t") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap<Long, Double>()); - } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); - } - } - - @Override - public String getDescription() { - return "Scatter-gather Single Source Shortest Paths"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java deleted file mode 100644 index c37b2b5..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the default data set used for the Simple Community Detection test program. - * If no parameters are given to the program, the default edge data set is used. - */ -public class CommunityDetectionData { - - // the algorithm is not guaranteed to always converge - public static final Integer MAX_ITERATIONS = 30; - - public static final double DELTA = 0.5f; - - public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + "2,6\n" - + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7"; - - public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1"; - - public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 2.0)); - edges.add(new Edge<Long, Double>(1L, 4L, 3.0)); - edges.add(new Edge<Long, Double>(2L, 3L, 4.0)); - edges.add(new Edge<Long, Double>(2L, 4L, 5.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 6.0)); - edges.add(new Edge<Long, Double>(5L, 6L, 7.0)); - edges.add(new Edge<Long, Double>(5L, 7L, 8.0)); - edges.add(new Edge<Long, Double>(6L, 7L, 9.0)); - edges.add(new Edge<Long, Double>(7L, 12L, 10.0)); - edges.add(new Edge<Long, Double>(8L, 9L, 11.0)); - edges.add(new Edge<Long, Double>(8L, 10L, 12.0)); - edges.add(new Edge<Long, Double>(8L, 11L, 13.0)); - edges.add(new Edge<Long, Double>(9L, 10L, 14.0)); - edges.add(new Edge<Long, Double>(9L, 11L, 15.0)); - edges.add(new Edge<Long, Double>(10L, 11L, 16.0)); - edges.add(new Edge<Long, Double>(10L, 12L, 17.0)); - edges.add(new Edge<Long, Double>(11L, 12L, 18.0)); - - return env.fromCollection(edges); - } - - public static DataSet<Edge<Long, Double>> getSimpleEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 2.0)); - edges.add(new Edge<Long, Double>(1L, 4L, 3.0)); - edges.add(new Edge<Long, Double>(1L, 5L, 4.0)); - edges.add(new Edge<Long, Double>(2L, 6L, 5.0)); - edges.add(new Edge<Long, Double>(6L, 7L, 6.0)); - edges.add(new Edge<Long, Double>(6L, 8L, 7.0)); - edges.add(new Edge<Long, Double>(7L, 8L, 8.0)); - - return env.fromCollection(edges); - } - - private CommunityDetectionData() {} - - public static DataSet<Edge<Long, Double>> getTieEdgeDataSet(ExecutionEnvironment env) { - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 4L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 5L, 1.0)); - - return env.fromCollection(edges); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java deleted file mode 100644 index 67864eb..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.types.NullValue; - -import java.util.LinkedList; -import java.util.List; - -/** - * Provides the default data sets used for the connected components example program. - * If no parameters are given to the program, the default data sets are used. - */ -public class ConnectedComponentsDefaultData { - - public static final Integer MAX_ITERATIONS = 4; - - public static final String EDGES = "1 2\n" + "2 3\n" + "2 4\n" + "3 4"; - - public static final Object[][] DEFAULT_EDGES = new Object[][] { - new Object[]{1L, 2L}, - new Object[]{2L, 3L}, - new Object[]{2L, 4L}, - new Object[]{3L, 4L} - }; - - public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>(); - for (Object[] edge : DEFAULT_EDGES) { - edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance())); - } - return env.fromCollection(edgeList); - } - - public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1"; - - private ConnectedComponentsDefaultData() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java deleted file mode 100644 index 80765bf..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.EuclideanGraphWeighing; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the default data sets used for the Euclidean Graph example program. - * If no parameters are given to the program, the default data sets are used. - */ -public class EuclideanGraphData { - - public static final int NUM_VERTICES = 9; - - public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" + - "6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0"; - - public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) { - - List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>(); - for(int i=1; i<=NUM_VERTICES; i++) { - vertices.add(new Vertex<Long, EuclideanGraphWeighing.Point>(new Long(i), - new EuclideanGraphWeighing.Point(new Double(i), new Double(i)))); - } - - return env.fromCollection(vertices); - } - - public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + "2,4\n" + "2,5\n" + - "3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + "6,7\n" + "6,8\n" + - "7,8\n" + "7,9\n" + "8,9"; - - public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 0.0)); - edges.add(new Edge<Long, Double>(1L, 4L, 0.0)); - edges.add(new Edge<Long, Double>(2L, 3L, 0.0)); - edges.add(new Edge<Long, Double>(2L, 4L, 0.0)); - edges.add(new Edge<Long, Double>(2L, 5L, 0.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 0.0)); - edges.add(new Edge<Long, Double>(4L, 5L, 0.0)); - edges.add(new Edge<Long, Double>(4L, 6L, 0.0)); - edges.add(new Edge<Long, Double>(5L, 7L, 0.0)); - edges.add(new Edge<Long, Double>(5L, 9L, 0.0)); - edges.add(new Edge<Long, Double>(6L, 7L, 0.0)); - edges.add(new Edge<Long, Double>(6L, 8L, 0.0)); - edges.add(new Edge<Long, Double>(6L, 8L, 0.0)); - edges.add(new Edge<Long, Double>(7L, 8L, 0.0)); - edges.add(new Edge<Long, Double>(7L, 9L, 0.0)); - edges.add(new Edge<Long, Double>(8L, 9L, 0.0)); - - return env.fromCollection(edges); - } - - public static final String RESULTED_WEIGHTED_EDGES = "1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" + - "2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" + "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" + - "4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" + "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" + - "6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" + "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" + - "8,9,1.4142135623730951"; - - private EuclideanGraphData() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java deleted file mode 100644 index 7fbee46..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; - -public class ExampleUtils { - - @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) - public static void printResult(DataSet set, String msg) { - set.output(new PrintingOutputFormatWithMessage(msg) { - }); - } - - public static class PrintingOutputFormatWithMessage<T> implements - OutputFormat<T> { - - private static final long serialVersionUID = 1L; - - private transient PrintStream stream; - - private transient String prefix; - - private String message; - - // -------------------------------------------------------------------------------------------- - - /** - * Instantiates a printing output format that prints to standard out. - */ - public PrintingOutputFormatWithMessage() { - } - - public PrintingOutputFormatWithMessage(String msg) { - this.message = msg; - } - - @Override - public void open(int taskNumber, int numTasks) { - // get the target stream - this.stream = System.out; - - // set the prefix to message - this.prefix = message + ": "; - } - - @Override - public void writeRecord(T record) { - if (this.prefix != null) { - this.stream.println(this.prefix + record.toString()); - } else { - this.stream.println(record.toString()); - } - } - - @Override - public void close() { - this.stream = null; - this.prefix = null; - } - - @Override - public String toString() { - return "Print to System.out"; - } - - @Override - public void configure(Configuration parameters) { - } - } - - @SuppressWarnings("serial") - public static DataSet<Vertex<Long, NullValue>> getVertexIds( - ExecutionEnvironment env, final long numVertices) { - return env.generateSequence(1, numVertices).map( - new MapFunction<Long, Vertex<Long, NullValue>>() { - public Vertex<Long, NullValue> map(Long l) { - return new Vertex<Long, NullValue>(l, NullValue - .getInstance()); - } - }); - } - - @SuppressWarnings("serial") - public static DataSet<Edge<Long, NullValue>> getRandomEdges( - ExecutionEnvironment env, final long numVertices) { - return env.generateSequence(1, numVertices).flatMap( - new FlatMapFunction<Long, Edge<Long, NullValue>>() { - @Override - public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception { - int numOutEdges = (int) (Math.random() * (numVertices / 2)); - for (int i = 0; i < numOutEdges; i++) { - long target = (long) (Math.random() * numVertices) + 1; - out.collect(new Edge<Long, NullValue>(key, target, - NullValue.getInstance())); - } - } - }); - } - - public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData( - ExecutionEnvironment env) { - List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>(); - vertices.add(new Vertex<Long, Double>(1L, 1.0)); - vertices.add(new Vertex<Long, Double>(2L, 2.0)); - vertices.add(new Vertex<Long, Double>(3L, 3.0)); - vertices.add(new Vertex<Long, Double>(4L, 4.0)); - vertices.add(new Vertex<Long, Double>(5L, 5.0)); - - return env.fromCollection(vertices); - } - - public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData( - ExecutionEnvironment env) { - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 12.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 13.0)); - edges.add(new Edge<Long, Double>(2L, 3L, 23.0)); - edges.add(new Edge<Long, Double>(3L, 4L, 34.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 35.0)); - edges.add(new Edge<Long, Double>(4L, 5L, 45.0)); - edges.add(new Edge<Long, Double>(5L, 1L, 51.0)); - - return env.fromCollection(edges); - } - - /** - * Private constructor to prevent instantiation. - */ - private ExampleUtils() { - throw new RuntimeException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java deleted file mode 100644 index 7b69ec0..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the default data sets used for the IncrementalSSSP example program. - * If no parameters are given to the program, the default data sets are used. - */ -public class IncrementalSSSPData { - - public static final int NUM_VERTICES = 5; - - public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5,0.0"; - - public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) { - - List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>(); - vertices.add(new Vertex<Long, Double>(1L, 6.0)); - vertices.add(new Vertex<Long, Double>(2L, 2.0)); - vertices.add(new Vertex<Long, Double>(3L, 3.0)); - vertices.add(new Vertex<Long, Double>(4L, 1.0)); - vertices.add(new Vertex<Long, Double>(5L, 0.0)); - - return env.fromCollection(vertices); - } - - public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" + - "4,5,1.0"; - - public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 3L, 3.0)); - edges.add(new Edge<Long, Double>(2L, 4L, 3.0)); - edges.add(new Edge<Long, Double>(2L, 5L, 2.0)); - edges.add(new Edge<Long, Double>(3L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 5.0)); - edges.add(new Edge<Long, Double>(4L, 5L, 1.0)); - - return env.fromCollection(edges); - } - - public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "4,5,1.0"; - - public static final DataSet<Edge<Long, Double>> getDefaultEdgesInSSSP(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 3L, 3.0)); - edges.add(new Edge<Long, Double>(2L, 5L, 2.0)); - edges.add(new Edge<Long, Double>(3L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(4L, 5L, 1.0)); - - return env.fromCollection(edges); - } - - public static final String SRC_EDGE_TO_BE_REMOVED = "2"; - - public static final String TRG_EDGE_TO_BE_REMOVED = "5"; - - public static final String VAL_EDGE_TO_BE_REMOVED = "2.0"; - - public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() { - - return new Edge<Long, Double>(2L, 5L, 2.0); - } - - public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE + "\n" + "2," + Double.MAX_VALUE+ "\n" - + "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0"; - - private IncrementalSSSPData() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java deleted file mode 100644 index 7564b95..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the default data sets used for the Jaccard Similarity Measure example program. - * If no parameters are given to the program, the default data sets are used. - */ -public class JaccardSimilarityMeasureData { - - public static final String EDGES = "1 2\n" + "1 3\n" + "1 4\n" + "1 5\n" + "2 3\n" + "2 4\n" + - "2 5\n" + "3 4\n" + "3 5\n" + "4 5"; - - public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, new Double(0))); - edges.add(new Edge<Long, Double>(1L, 3L, new Double(0))); - edges.add(new Edge<Long, Double>(1L, 4L, new Double(0))); - edges.add(new Edge<Long, Double>(1L, 5L, new Double(0))); - edges.add(new Edge<Long, Double>(2L, 3L, new Double(0))); - edges.add(new Edge<Long, Double>(2L, 4L, new Double(0))); - edges.add(new Edge<Long, Double>(2L, 5L, new Double(0))); - edges.add(new Edge<Long, Double>(3L, 4L, new Double(0))); - edges.add(new Edge<Long, Double>(3L, 5L, new Double(0))); - edges.add(new Edge<Long, Double>(4L, 5L, new Double(0))); - - return env.fromCollection(edges); - } - - public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" + - "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6"; - - private JaccardSimilarityMeasureData() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java deleted file mode 100644 index 0a92097..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.NullValue; - -/** - * Provides the default data set used for the Label Propagation test program. - * If no parameters are given to the program, the default edge data set is used. - */ -public class LabelPropagationData { - - public static final String LABELS_AFTER_1_ITERATION = "1,10\n" + - "2,10\n" + - "3,10\n" + - "4,40\n" + - "5,40\n" + - "6,40\n" + - "7,40\n"; - - public static final String LABELS_WITH_TIE ="1,10\n" + - "2,10\n" + - "3,10\n" + - "4,10\n" + - "5,20\n" + - "6,20\n" + - "7,20\n" + - "8,20\n" + - "9,20\n"; - - private LabelPropagationData() {} - - public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) { - - List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); - vertices.add(new Vertex<Long, Long>(1l, 10l)); - vertices.add(new Vertex<Long, Long>(2l, 10l)); - vertices.add(new Vertex<Long, Long>(3l, 30l)); - vertices.add(new Vertex<Long, Long>(4l, 40l)); - vertices.add(new Vertex<Long, Long>(5l, 40l)); - vertices.add(new Vertex<Long, Long>(6l, 40l)); - vertices.add(new Vertex<Long, Long>(7l, 40l)); - - return env.fromCollection(vertices); - } - - public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); - edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance())); - - return env.fromCollection(edges); - } - - public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) { - - List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); - vertices.add(new Vertex<Long, Long>(1l, 10l)); - vertices.add(new Vertex<Long, Long>(2l, 10l)); - vertices.add(new Vertex<Long, Long>(3l, 10l)); - vertices.add(new Vertex<Long, Long>(4l, 10l)); - vertices.add(new Vertex<Long, Long>(5l, 0l)); - vertices.add(new Vertex<Long, Long>(6l, 20l)); - vertices.add(new Vertex<Long, Long>(7l, 20l)); - vertices.add(new Vertex<Long, Long>(8l, 20l)); - vertices.add(new Vertex<Long, Long>(9l, 20l)); - - return env.fromCollection(vertices); - } - - public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); - edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance())); - - return env.fromCollection(edges); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java deleted file mode 100644 index 3a97d1f..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; - -/** - * Provides the default data sets used for the Music Profiles example program. - * If no parameters are given to the program, the default data sets are used. - */ -public class MusicProfilesData { - - public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) { - List<Tuple3<String, String, Integer>> triplets = new ArrayList<Tuple3<String, String, Integer>>(); - - triplets.add(new Tuple3<String, String, Integer>("user_1", "song_1", 100)); - triplets.add(new Tuple3<String, String, Integer>("user_1", "song_2", 10)); - triplets.add(new Tuple3<String, String, Integer>("user_1", "song_3", 20)); - triplets.add(new Tuple3<String, String, Integer>("user_1", "song_4", 30)); - triplets.add(new Tuple3<String, String, Integer>("user_1", "song_5", 1)); - - triplets.add(new Tuple3<String, String, Integer>("user_2", "song_6", 40)); - triplets.add(new Tuple3<String, String, Integer>("user_2", "song_7", 10)); - triplets.add(new Tuple3<String, String, Integer>("user_2", "song_8", 3)); - - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_1", 100)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_2", 10)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_3", 20)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_8", 30)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_9", 1)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_10", 8)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_11", 90)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_12", 30)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_13", 34)); - triplets.add(new Tuple3<String, String, Integer>("user_3", "song_14", 17)); - - triplets.add(new Tuple3<String, String, Integer>("user_4", "song_1", 100)); - triplets.add(new Tuple3<String, String, Integer>("user_4", "song_6", 10)); - triplets.add(new Tuple3<String, String, Integer>("user_4", "song_8", 20)); - triplets.add(new Tuple3<String, String, Integer>("user_4", "song_12", 30)); - triplets.add(new Tuple3<String, String, Integer>("user_4", "song_13", 1)); - triplets.add(new Tuple3<String, String, Integer>("user_4", "song_15", 1)); - - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_3", 300)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_4", 4)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_5", 5)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_8", 8)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_9", 9)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_10", 10)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_12", 12)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_13", 13)); - triplets.add(new Tuple3<String, String, Integer>("user_5", "song_15", 15)); - - triplets.add(new Tuple3<String, String, Integer>("user_6", "song_6", 30)); - - return env.fromCollection(triplets); - } - - public static DataSet<String> getMismatches(ExecutionEnvironment env) { - List<String> errors = new ArrayList<String>(); - errors.add("ERROR: <song_8 track_8> Sever"); - errors.add("ERROR: <song_15 track_15> Black Trees"); - return env.fromCollection(errors); - } - - public static final String USER_SONG_TRIPLETS = "user_1 song_1 100\n" + "user_1 song_5 200\n" - + "user_2 song_1 10\n" + "user_2 song_4 20\n" - + "user_3 song_2 3\n" - + "user_4 song_2 1\n" + "user_4 song_3 2\n" - + "user_5 song_3 30"; - - public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie"; - - public static final String MAX_ITERATIONS = "2"; - - public static final String TOP_SONGS_RESULT = "user_1 song_1\n" + - "user_2 song_4\n" + - "user_3 song_2\n" + - "user_4 song_3\n" + - "user_5 song_3"; - - public static final String COMMUNITIES_RESULT = "user_1 1\n" + - "user_2 1\n" + - "user_3 3\n" + - "user_4 3\n" + - "user_5 4"; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java deleted file mode 100644 index 58d4f8b..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; - -/** - * Provides the default data set used for the PageRank test program. - * If no parameters are given to the program, the default edge data set is used. - */ -public class PageRankData { - - public static final String EDGES = "2 1\n" + - "5 2\n" + - "5 4\n" + - "4 3\n" + - "4 2\n" + - "1 4\n" + - "1 2\n" + - "1 3\n" + - "3 5\n"; - - - public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" + - "2,0.248\n" + - "3,0.173\n" + - "4,0.175\n" + - "5,0.165\n"; - - private PageRankData() {} - - public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(2L, 1L, 1.0)); - edges.add(new Edge<Long, Double>(5L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(5L, 4L, 1.0)); - edges.add(new Edge<Long, Double>(4L, 3L, 1.0)); - edges.add(new Edge<Long, Double>(4L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 4L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 1.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 1.0)); - - return env.fromCollection(edges); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java deleted file mode 100644 index 6b985c5..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import java.util.LinkedList; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; - -/** - * Provides the default data set used for the Single Source Shortest Paths example program. - * If no parameters are given to the program, the default edge data set is used. - */ -public class SingleSourceShortestPathsData { - - public static final Long SRC_VERTEX_ID = 1L; - - public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" + - "4\t5\t45.0\n" + "5\t1\t51.0"; - - public static final Object[][] DEFAULT_EDGES = new Object[][] { - new Object[]{1L, 2L, 12.0}, - new Object[]{1L, 3L, 13.0}, - new Object[]{2L, 3L, 23.0}, - new Object[]{3L, 4L, 34.0}, - new Object[]{3L, 5L, 35.0}, - new Object[]{4L, 5L, 45.0}, - new Object[]{5L, 1L, 51.0} - }; - - public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "1,0.0\n" + "2,12.0\n" + "3,13.0\n" + - "4,47.0\n" + "5,48.0"; - - public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>(); - for (Object[] edge : DEFAULT_EDGES) { - edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2])); - } - return env.fromCollection(edgeList); - } - - private SingleSourceShortestPathsData() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java deleted file mode 100644 index 88f76cc..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import com.google.common.collect.Lists; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.NullValue; - -import java.util.List; - -/** - * Provides the default data set used for Summarization tests. - */ -public class SummarizationData { - - private SummarizationData() {} - - /** - * The resulting vertex id can be any id of the vertices summarized by the single vertex. - * - * Format: - * - * "possible-id[,possible-id];group-value,group-count" - */ - public static final String[] EXPECTED_VERTICES = new String[] { - "0,1;A,2", - "2,3,4;B,3", - "5;C,1" - }; - - /** - * Format: - * - * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count" - */ - public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] { - "0,1;0,1;A,2", - "0,1;2,3,4;A,1", - "2,3,4;0,1;A,1", - "2,3,4;0,1;C,2", - "2,3,4;2,3,4;B,2", - "5;2,3,4;D,2" - }; - - /** - * Format: - * - * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count" - */ - public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new String[] { - "0,1;0,1;(null),2", - "0,1;2,3,4;(null),1", - "2,3,4;0,1;(null),3", - "2,3,4;2,3,4;(null),2", - "5;2,3,4;(null),2" - }; - - /** - * Creates a set of vertices with attached {@link String} values. - * - * @param env execution environment - * @return vertex data set with string values - */ - public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment env) { - List<Vertex<Long, String>> vertices = Lists.newArrayListWithExpectedSize(6); - vertices.add(new Vertex<>(0L, "A")); - vertices.add(new Vertex<>(1L, "A")); - vertices.add(new Vertex<>(2L, "B")); - vertices.add(new Vertex<>(3L, "B")); - vertices.add(new Vertex<>(4L, "B")); - vertices.add(new Vertex<>(5L, "C")); - - return env.fromCollection(vertices); - } - - /** - * Creates a set of edges with attached {@link String} values. - * - * @param env execution environment - * @return edge data set with string values - */ - public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env) { - List<Edge<Long, String>> edges = Lists.newArrayListWithExpectedSize(10); - edges.add(new Edge<>(0L, 1L, "A")); - edges.add(new Edge<>(1L, 0L, "A")); - edges.add(new Edge<>(1L, 2L, "A")); - edges.add(new Edge<>(2L, 1L, "A")); - edges.add(new Edge<>(2L, 3L, "B")); - edges.add(new Edge<>(3L, 2L, "B")); - edges.add(new Edge<>(4L, 0L, "C")); - edges.add(new Edge<>(4L, 1L, "C")); - edges.add(new Edge<>(5L, 2L, "D")); - edges.add(new Edge<>(5L, 3L, "D")); - - return env.fromCollection(edges); - } - - /** - * Creates a set of edges with {@link NullValue} as edge value. - * - * @param env execution environment - * @return edge data set with null values - */ - @SuppressWarnings("serial") - public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment env) { - return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>() { - @Override - public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception { - return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance()); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java deleted file mode 100644 index c8cea12..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example.utils; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.types.NullValue; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the default data sets used for the Triangle Count test program. - * If no parameters are given to the program, the default data sets are used. - */ -public class TriangleCountData { - - public static final String EDGES = "1 2\n"+"1 3\n"+"2 3\n"+"2 6\n"+"3 4\n"+"3 5\n"+"3 6\n"+"4 5\n"+"6 7\n"; - - public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); - edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance())); - - return env.fromCollection(edges); - } - - public static final String RESULTED_NUMBER_OF_TRIANGLES = "3"; - - public static List<Tuple3<Long,Long,Long>> getListOfTriangles() { - ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3); - ret.add(new Tuple3<>(1L,2L,3L)); - ret.add(new Tuple3<>(2L,3L,6L)); - ret.add(new Tuple3<>(3L,4L,5L)); - return ret; - } - - private TriangleCountData () {} -}
