[
https://issues.apache.org/jira/browse/FLINK-1528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14351544#comment-14351544
]
ASF GitHub Bot commented on FLINK-1528:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/420#discussion_r25995666
--- Diff:
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java
---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.LocalClusteringCoefficientData;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashSet;
+
+public class LocalClusteringCoefficientExample implements
ProgramDescription {
+
+ //
--------------------------------------------------------------------------------------------
+ // Program
+ //
--------------------------------------------------------------------------------------------
+
+ public static void main (String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+ Graph<Long, NullValue, NullValue> graph =
Graph.fromDataSet(edges, env);
+
+ // Get the neighbors of each vertex in a HashSet
+ DataSet<Tuple2<Long, HashSet<Long>>> neighborhoods = graph
+ .reduceOnEdges(new NeighborhoodEdgesFunction(),
EdgeDirection.OUT);
+
+ // Construct a new graph where the neighborhood is the vertex
value
+ Graph<Long, HashSet<Long>, NullValue> newGraph = graph
+ .mapVertices(new EmptyVertexMapFunction())
+ .joinWithVertices(neighborhoods, new
NeighborhoodVertexMapFunction());
+
+ // Calculate clustering coefficient
+ DataSet<Tuple2<Long, Double>> clusteringCoefficients = newGraph
+ .reduceOnNeighbors(new
ClusteringCoefficientNeighborsFunction(), EdgeDirection.OUT);
+
+ // Emit results
+ if(fileOutput) {
+ clusteringCoefficients.writeAsCsv(outputPath, "\n",
",");
+ } else {
+ clusteringCoefficients.print();
+ }
+
+ env.execute("Local Clustering Coefficient Example");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Clustering Coefficient Functions
+ //
--------------------------------------------------------------------------------------------
+
+ private static final class NeighborhoodEdgesFunction
+ implements EdgesFunction<Long, NullValue, Tuple2<Long,
HashSet<Long>>> {
+
+ @Override
+ public Tuple2<Long, HashSet<Long>> iterateEdges(
+ Iterable<Tuple2<Long, Edge<Long, NullValue>>>
edges) throws Exception {
+
+ Long vertexId = null;
+ HashSet<Long> neighbors = new HashSet<Long>();
+
+ for (Tuple2<Long, Edge<Long, NullValue>> edge : edges) {
+ vertexId = edge.f0;
+ neighbors.add(edge.f1.getTarget());
+ }
+
+ return new Tuple2<Long, HashSet<Long>>(vertexId,
neighbors);
+ }
+ }
+
+ private static final class EmptyVertexMapFunction
+ implements MapFunction<Vertex<Long, NullValue>,
HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> map(Vertex<Long, NullValue> arg) throws
Exception {
+ return new HashSet<Long>();
+ }
+ }
+
+ private static final class NeighborhoodVertexMapFunction
+ implements MapFunction<Tuple2<HashSet<Long>,
HashSet<Long>>, HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>>
arg) throws Exception {
+ return arg.f1;
+ }
+ }
+
+ private static final class ClusteringCoefficientNeighborsFunction
+ implements NeighborsFunctionWithVertexValue<Long,
HashSet<Long>, NullValue, Tuple2<Long, Double>> {
+
+ @Override
+ public Tuple2<Long, Double> iterateNeighbors(Vertex<Long,
HashSet<Long>> vertex,
+ Iterable<Tuple2<Edge<Long, NullValue>,
Vertex<Long, HashSet<Long>>>> neighbors) throws Exception {
+
+ int deg = vertex.getValue().size();
+ int e = 0;
+
+ // Calculate common neighbor count (e)
+ for (Tuple2<Edge<Long, NullValue>, Vertex<Long,
HashSet<Long>>> neighbor : neighbors) {
+ // Iterate neighbor's neighbors
+ for (Long nn : neighbor.f1.getValue()) {
+ if (vertex.getValue().contains(nn)) {
+ e++;
+ }
+ }
+ }
+
+ // We assume an undirected graph, so we need to divide
e here
+ e /= 2;
--- End diff --
I don't get the reasoning behind this.. Do you assume an undirected graph
as input? or you convert the input to undirected? in any case, I think we
should add a comment in the beginning of the example to define the behavior.
> Add local clustering coefficient library method and example
> -----------------------------------------------------------
>
> Key: FLINK-1528
> URL: https://issues.apache.org/jira/browse/FLINK-1528
> Project: Flink
> Issue Type: Task
> Components: Gelly
> Reporter: Vasia Kalavri
> Assignee: Daniel Bali
>
> Add a gelly library method and example to compute the local clustering
> coefficient.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)