This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7a3d5bb9e feat: add jaccard similarity (#650)
7a3d5bb9e is described below
commit 7a3d5bb9eed0c66655a19e6d9c302b90a0c689b2
Author: kitalkuyo-gita <[email protected]>
AuthorDate: Tue Nov 18 15:32:53 2025 +0800
feat: add jaccard similarity (#650)
* feat: add jaccard similarity
* fix: tests
* remove useless files
* add build in register function
* bugfix: fix tests
* chore: fix checkstyle
* refactor: simple logic
* bugfix: fix dataset
* bugfix: exclude cycle
* bufix: excclude cycle
* refactor: designate one to calculate the final similarity
* optimize: replace memory relationship to id
* refactor: messge type divide
* bugfix: add random seed
* bugfix: fix windows
* chore: optimize import
* fix checkstyle
---------
Co-authored-by: undertaker86001 <[email protected]>
---
.../schema/function/BuildInSqlFunctionTable.java | 2 +
.../geaflow/dsl/udf/graph/JaccardSimilarity.java | 196 +++++++++++++++++++++
.../dsl/runtime/query/GQLAlgorithmTest.java | 10 ++
.../geaflow/dsl/runtime/query/IncrMatchTest.java | 61 +++++--
.../expect/gql_algorithm_jaccard_similarity.txt | 1 +
.../query/gql_algorithm_jaccard_similarity.sql | 34 ++++
6 files changed, 286 insertions(+), 18 deletions(-)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
index 466389a97..06aca5850 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
@@ -41,6 +41,7 @@ import org.apache.geaflow.dsl.udf.graph.IncKHopAlgorithm;
import org.apache.geaflow.dsl.udf.graph.IncMinimumSpanningTree;
import org.apache.geaflow.dsl.udf.graph.IncWeakConnectedComponents;
import org.apache.geaflow.dsl.udf.graph.IncrementalKCore;
+import org.apache.geaflow.dsl.udf.graph.JaccardSimilarity;
import org.apache.geaflow.dsl.udf.graph.KCore;
import org.apache.geaflow.dsl.udf.graph.KHop;
import org.apache.geaflow.dsl.udf.graph.PageRank;
@@ -218,6 +219,7 @@ public class BuildInSqlFunctionTable extends
ListSqlOperatorTable {
.add(GeaFlowFunction.of(TriangleCount.class))
.add(GeaFlowFunction.of(IncWeakConnectedComponents.class))
.add(GeaFlowFunction.of(CommonNeighbors.class))
+ .add(GeaFlowFunction.of(JaccardSimilarity.class))
.add(GeaFlowFunction.of(IncKHopAlgorithm.class))
.build();
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/JaccardSimilarity.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/JaccardSimilarity.java
new file mode 100644
index 000000000..e0516f85d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/JaccardSimilarity.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.geaflow.common.tuple.Tuple;
+import org.apache.geaflow.common.type.primitive.DoubleType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.util.TypeCastUtil;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+
+@Description(name = "jaccard_similarity", description = "built-in udga for
Jaccard Similarity")
+public class JaccardSimilarity implements AlgorithmUserFunction<Object,
ObjectRow> {
+
+ private AlgorithmRuntimeContext<Object, ObjectRow> context;
+
+ // tuple to store params
+ private Tuple<Object, Object> vertices;
+
+ @Override
+ public void init(AlgorithmRuntimeContext<Object, ObjectRow> context,
Object[] params) {
+ this.context = context;
+
+ if (params.length != 2) {
+ throw new IllegalArgumentException("Only support two arguments,
usage: jaccard_similarity(id_a, id_b)");
+ }
+ this.vertices = new Tuple<>(
+ TypeCastUtil.cast(params[0], context.getGraphSchema().getIdType()),
+ TypeCastUtil.cast(params[1], context.getGraphSchema().getIdType())
+ );
+ }
+
+ @Override
+ public void process(RowVertex vertex, Optional<Row> updatedValues,
Iterator<ObjectRow> messages) {
+ if (context.getCurrentIterationId() == 1L) {
+ // First iteration: vertices A and B compute their neighbor counts
+ if (vertices.f0.equals(vertex.getId()) ||
vertices.f1.equals(vertex.getId())) {
+ List<RowEdge> edges = context.loadEdges(EdgeDirection.BOTH);
+ Object sourceId = vertex.getId();
+
+ // Calculate unique neighbors count (de-duplicate and exclude
self-loops)
+ Set<Object> uniqueNeighbors = new HashSet<>();
+ for (RowEdge edge : edges) {
+ Object targetId = edge.getTargetId();
+ // Exclude self-loops: only add if targetId != sourceId
+ if (!sourceId.equals(targetId)) {
+ uniqueNeighbors.add(targetId);
+ }
+ }
+
+ // Calculate neighbor count for this vertex
+ long neighborCount = uniqueNeighbors.size();
+
+ // Send messages to all unique neighbors
+ // Message format: [sourceId, neighborCount, messageType]
+ // messageType = 0: neighbor inquiry, messageType = 1: count
from target vertex
+ for (Object neighbor : uniqueNeighbors) {
+ context.sendMessage(neighbor, ObjectRow.create(sourceId,
neighborCount, 0L));
+ }
+
+ // Send neighbor count to the other target vertex (A ↔ B
exchange)
+ // Message format: [vertexId, neighborCount, messageType]
+ // messageType = 1: this is a count message from target vertex
B
+ if (vertices.f0.equals(sourceId) &&
!vertices.f0.equals(vertices.f1)) {
+ context.sendMessage(vertices.f1,
ObjectRow.create(sourceId, neighborCount, 1L));
+ } else if (vertices.f1.equals(sourceId) &&
!vertices.f0.equals(vertices.f1)) {
+ context.sendMessage(vertices.f0,
ObjectRow.create(sourceId, neighborCount, 1L));
+ }
+ }
+ } else if (context.getCurrentIterationId() == 2L) {
+ // Second iteration: calculate Jaccard similarity
+ if (vertices.f0.equals(vertex.getId()) ||
vertices.f1.equals(vertex.getId())) {
+ // Extract neighbor counts and count common neighbors
+ long neighborCountA = 0;
+ long neighborCountB = 0;
+ long localCommonNeighborCount = 0;
+
+ while (messages.hasNext()) {
+ ObjectRow message = messages.next();
+ Object senderId = message.getField(0,
context.getGraphSchema().getIdType());
+ long count = (Long) message.getField(1,
org.apache.geaflow.common.type.primitive.LongType.INSTANCE);
+ long messageType = (Long) message.getField(2,
org.apache.geaflow.common.type.primitive.LongType.INSTANCE);
+
+ // messageType = 1: neighbor count from the other target
vertex (A or B)
+ // messageType = 0: confirmation from common neighbor
+ if (messageType == 1L) {
+ // This is a count message from target vertex
+ if (vertices.f0.equals(senderId)) {
+ neighborCountA = count;
+ } else if (vertices.f1.equals(senderId)) {
+ neighborCountB = count;
+ }
+ } else {
+ // This is a confirmation from a common neighbor
+ localCommonNeighborCount++;
+ }
+ }
+
+ // Calculate and output the Jaccard coefficient only from
vertex A
+ if (vertices.f0.equals(vertex.getId())) {
+ // If neighborCountA is 0, calculate it from edges
+ if (neighborCountA == 0) {
+ Object sourceId = vertex.getId();
+ List<RowEdge> edges =
context.loadEdges(EdgeDirection.BOTH);
+ Set<Object> neighbors = new HashSet<>();
+ for (RowEdge edge : edges) {
+ Object targetId = edge.getTargetId();
+ if (!sourceId.equals(targetId)) {
+ neighbors.add(targetId);
+ }
+ }
+ neighborCountA = neighbors.size();
+ }
+
+ // Calculate Jaccard coefficient: |A ∩ B| / |A ∪ B|
+ long intersection = localCommonNeighborCount;
+ long union = neighborCountA + neighborCountB -
intersection;
+ double jaccardCoefficient = union == 0 ? 0.0 : (double)
intersection / union;
+
+ // Output the result
+ context.take(ObjectRow.create(vertices.f0, vertices.f1,
jaccardCoefficient));
+ }
+ } else {
+ // For non-A, non-B vertices: check if they received messages
from both A and B
+ boolean receivedFromA = false;
+ boolean receivedFromB = false;
+
+ while (messages.hasNext()) {
+ ObjectRow message = messages.next();
+ Object senderId = message.getField(0,
context.getGraphSchema().getIdType());
+ long messageType = (Long) message.getField(2,
org.apache.geaflow.common.type.primitive.LongType.INSTANCE);
+
+ // Only count messages with type 0 (neighbor inquiry)
+ if (messageType == 0L) {
+ if (vertices.f0.equals(senderId)) {
+ receivedFromA = true;
+ }
+ if (vertices.f1.equals(senderId)) {
+ receivedFromB = true;
+ }
+ }
+ }
+
+ // If this vertex received messages from both A and B, it's a
common neighbor
+ // Send confirmation to vertex A with format [vertexId, 1, 0]
+ if (receivedFromA && receivedFromB) {
+ context.sendMessage(vertices.f0,
ObjectRow.create(vertex.getId(), 1L, 0L));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+ // No additional finish processing needed
+ }
+
+ @Override
+ public StructType getOutputType(GraphSchema graphSchema) {
+ return new StructType(
+ new TableField("vertex_a", graphSchema.getIdType(), false),
+ new TableField("vertex_b", graphSchema.getIdType(), false),
+ new TableField("jaccard_coefficient", DoubleType.INSTANCE, false)
+ );
+ }
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
index 7982649f5..2fc903575 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
@@ -258,6 +258,16 @@ public class GQLAlgorithmTest {
.checkSinkResult();
}
+ @Test
+ public void testAlgorithmJaccardSimilarity() throws Exception {
+ QueryTester
+ .build()
+ .withGraphDefine("/query/modern_graph.sql")
+ .withQueryPath("/query/gql_algorithm_jaccard_similarity.sql")
+ .execute()
+ .checkSinkResult();
+ }
+
@Test
public void testEdgeIterator() throws Exception {
QueryTester
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
index 527e2b999..bac6b1f93 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
import org.apache.geaflow.common.config.keys.DSLConfigKeys;
import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
import org.testng.Assert;
@@ -65,11 +64,26 @@ public class IncrMatchTest {
String allPath = getTargetPath(queryPath);
List<Set<String>> allRes = readRes(allPath, false);
- Assert.assertEquals(incrRes.size(), allRes.size());
-
- // the last is empty iteration, ignore.
- for (int i = 0; i < incrRes.size() - 1; i++) {
- Assert.assertEquals(allRes.get(i), incrRes.get(i));
+
+ // Ensure both results have the same number of windows
+ Assert.assertEquals(incrRes.size(), allRes.size(),
+ "Incremental and full traversal should have same number of
windows");
+
+ // For incremental traversal, each window contains cumulative results
(all results from window 0 to current)
+ // For full traversal, each window contains only results from that
specific window
+ // So we need to compare: incremental[i] should equal union of full[0]
to full[i]
+ Set<String> cumulativeFull = new HashSet<>();
+ for (int i = 0; i < incrRes.size(); i++) {
+ Set<String> incrSet = incrRes.get(i);
+ Set<String> fullSet = allRes.get(i);
+
+ // Add current window's results to cumulative set
+ cumulativeFull.addAll(fullSet);
+
+ // Compare incremental result (cumulative) with cumulative full
result
+ Assert.assertEquals(incrSet, cumulativeFull,
+ String.format("Window %d mismatch: incremental
(cumulative)=%s, full (cumulative)=%s",
+ i, incrSet, cumulativeFull));
}
}
@@ -103,12 +117,13 @@ public class IncrMatchTest {
File edgeFile = new File("/tmp/geaflow-test/incr_modern_edge.txt");
Set<Tuple2<Integer, Integer>> edges = new HashSet<>();
- Random r = new Random();
+ // Use fixed seed for deterministic test results to avoid flaky tests
+ Random r = new Random(42L);
while (edges.size() < edgeNum) {
- int src = RandomUtils.nextInt(1, vertexNum + 1);
- int dst = RandomUtils.nextInt(1, vertexNum + 1);
+ int src = r.nextInt(vertexNum) + 1; // 1 to vertexNum
+ int dst = r.nextInt(vertexNum) + 1; // 1 to vertexNum
while (src == dst) {
- dst = RandomUtils.nextInt(1, vertexNum + 1);
+ dst = r.nextInt(vertexNum) + 1;
}
edges.add(new Tuple2<>(src, dst));
}
@@ -161,27 +176,37 @@ public class IncrMatchTest {
String currentLine;
while ((currentLine = reader.readLine()) != null) {
if (currentLine.equals(lineSplit)) {
+ // Process window separator
if (curWindow.isEmpty()) {
+ // Empty window: for incremental, keep history; for
full, use empty
if (isIncr) {
res.add(new HashSet<>(allHistoryRes));
- continue;
} else {
res.add(new HashSet<>());
- continue;
}
- }
-
- if (isIncr) {
- allHistoryRes.addAll(curWindow);
- res.add(new HashSet<>(allHistoryRes));
} else {
- res.add(new HashSet<>(curWindow));
+ // Non-empty window
+ if (isIncr) {
+ allHistoryRes.addAll(curWindow);
+ res.add(new HashSet<>(allHistoryRes));
+ } else {
+ res.add(new HashSet<>(curWindow));
+ }
}
curWindow = new HashSet<>();
} else {
curWindow.add(currentLine);
}
}
+ // Handle last window if file doesn't end with separator
+ if (!curWindow.isEmpty()) {
+ if (isIncr) {
+ allHistoryRes.addAll(curWindow);
+ res.add(new HashSet<>(allHistoryRes));
+ } else {
+ res.add(new HashSet<>(curWindow));
+ }
+ }
} catch (IOException e) {
e.printStackTrace();
} finally {
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_jaccard_similarity.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_jaccard_similarity.txt
new file mode 100644
index 000000000..249576b58
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_jaccard_similarity.txt
@@ -0,0 +1 @@
+1,3,0.2
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_jaccard_similarity.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_jaccard_similarity.sql
new file mode 100644
index 000000000..a72cb388c
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_jaccard_similarity.sql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE result_tb (
+ vertex_a int,
+ vertex_b int,
+ jaccard_coefficient double
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO result_tb
+CALL jaccard_similarity(1, 3) YIELD (vertex_a, vertex_b, jaccard_coefficient)
+RETURN cast(vertex_a as int), cast(vertex_b as int), jaccard_coefficient
+;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]