[
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15291243#comment-15291243
]
ASF GitHub Bot commented on FLINK-3780:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/1980#discussion_r63895157
--- Diff:
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are
common).
+ * <br/>
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * <br/>
+ * The input graph must be a simple, undirected graph containing no
duplicate
+ * edges or self-loops.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+
+ public static final int DEFAULT_GROUP_SIZE = 64;
+
+ // Optional configuration
+ private int groupSize = DEFAULT_GROUP_SIZE;
+
+ private boolean unboundedScores = true;
+
+ private int minimumScoreNumerator = 0;
+
+ private int minimumScoreDenominator = 1;
+
+ private int maximumScoreNumerator = 1;
+
+ private int maximumScoreDenominator = 0;
+
+ private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+ /**
+ * Override the default group size for the quadratic expansion of
neighbor
+ * pairs. Small groups generate more data whereas large groups
distribute
+ * computation less evenly among tasks.
+ *
+ * @param groupSize the group size for the quadratic expansion of
neighbor pairs
+ * @return this
+ */
+ public JaccardIndex<K, VV, EV> setGroupSize(int groupSize) {
+ Preconditions.checkArgument(groupSize > 0, "Group size must be
greater than zero");
+
+ this.groupSize = groupSize;
+
+ return this;
+ }
+
+ /**
+ * Filter out Jaccard Index scores less than the given minimum fraction.
+ *
+ * @param numerator numerator of the minimum score
+ * @param denominator denominator of the minimum score
+ * @return this
+ * @see #setMaximumScore(int, int)
+ */
+ public JaccardIndex<K, VV, EV> setMinimumScore(int numerator, int
denominator) {
+ Preconditions.checkArgument(numerator >= 0, "Minimum score
numerator must be non-negative");
+ Preconditions.checkArgument(denominator > 0, "Minimum score
denominator must be greater than zero");
+ Preconditions.checkArgument(numerator <= denominator, "Minimum
score fraction must be less than or equal to one");
+
+ this.unboundedScores = false;
+ this.minimumScoreNumerator = numerator;
+ this.minimumScoreDenominator = denominator;
+
+ return this;
+ }
+
+ /**
+ * Filter out Jaccard Index scores greater than or equal to the given
maximum fraction.
+ *
+ * @param numerator numerator of the maximum score
+ * @param denominator denominator of the maximum score
+ * @return this
+ * @see #setMinimumScore(int, int)
+ */
+ public JaccardIndex<K, VV, EV> setMaximumScore(int numerator, int
denominator) {
+ Preconditions.checkArgument(numerator >= 0, "Maximum score
numerator must be non-negative");
+ Preconditions.checkArgument(denominator > 0, "Maximum score
denominator must be greater than zero");
+ Preconditions.checkArgument(numerator <= denominator, "Maximum
score fraction must be less than or equal to one");
+
+ this.unboundedScores = false;
+ this.maximumScoreNumerator = numerator;
+ this.maximumScoreDenominator = denominator;
+
+ return this;
+ }
+
+ /**
+ * Override the parallelism of operators processing small amounts of
data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public JaccardIndex<K, VV, EV> setLittleParallelism(int
littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ /*
+ * Implementation notes:
+ *
+ * The requirement that "K extends CopyableValue<K>" can be removed when
+ * Flink has a self-join which performs the skew distribution handled
by
+ * GenerateGroupSpans / GenerateGroups / GenerateGroupPairs.
+ */
+
+ @Override
+ public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // s, t, d(t)
+ DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input
+ .run(new EdgeTargetDegree<K, VV, EV>()
+ .setParallelism(littleParallelism));
+
+ // group span, s, t, d(t)
+ DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans =
neighborDegree
+ .groupBy(0)
+ .sortGroup(1, Order.ASCENDING)
+ .reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
+ .setParallelism(littleParallelism)
+ .name("Generate group spans");
+
+ // group, s, t, d(t)
+ DataSet<Tuple4<IntValue, K, K, IntValue>> groups = groupSpans
+ .rebalance()
+ .setParallelism(littleParallelism)
+ .name("Rebalance")
+ .flatMap(new GenerateGroups<K>())
+ .setParallelism(littleParallelism)
+ .name("Generate groups");
+
+ // t, u, d(t)+d(u)
+ DataSet<Tuple3<K, K, IntValue>> twoPaths = groups
+ .groupBy(0, 1)
+ .sortGroup(2, Order.ASCENDING)
+ .reduceGroup(new GenerateGroupPairs<K>(groupSize))
+ .name("Generate group pairs");
+
+ // t, u, intersection, union
+ return twoPaths
+ .groupBy(0, 1)
+ .reduceGroup(new ComputeScores<K>(unboundedScores,
+ minimumScoreNumerator,
minimumScoreDenominator,
+ maximumScoreNumerator,
maximumScoreDenominator))
+ .name("Compute scores");
+ }
+
+ /**
+ * This is the first of three operations implementing a self-join to
generate
+ * the full neighbor pairing for each vertex. The number of neighbor
pairs
+ * is (n choose 2) which is quadratic in the vertex degree.
+ * <br/>
+ * The third operation, {@link GenerateGroupPairs}, processes groups of
size
+ * {@link #groupSize} and emits {@code O(groupSize * deg(vertex))}
pairs.
+ * <br/>
+ * This input to the third operation is still quadratic in the vertex
degree.
+ * Two prior operations, {@link GenerateGroupSpans} and {@link
GenerateGroups},
+ * each emit datasets linear in the vertex degree, with a forced
rebalance
+ * in between. {@link GenerateGroupSpans} first annotates each edge
with the
+ * number of groups and {@link GenerateGroups} emits each edge into
each group.
--- End diff --
👍 for the detailed comment!
> Jaccard Similarity
> ------------------
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
> Issue Type: New Feature
> Components: Gelly
> Affects Versions: 1.1.0
> Reporter: Greg Hogan
> Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity
> scores. This algorithm is similar to {{TriangleListing}} but instead of
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which
> relies on {{Graph.getTriplets()}} so only computes similarity scores for
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the
> degree of the middle vertex.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)