This is an automated email from the ASF dual-hosted git repository.

loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a3d5bb9e feat: add jaccard similarity (#650)
7a3d5bb9e is described below

commit 7a3d5bb9eed0c66655a19e6d9c302b90a0c689b2
Author: kitalkuyo-gita <[email protected]>
AuthorDate: Tue Nov 18 15:32:53 2025 +0800

    feat: add jaccard similarity (#650)
    
    * feat: add jaccard similarity
    
    * fix: tests
    
    * remove useless files
    
    * add build in register function
    
    * bugfix: fix tests
    
    * chore: fix checkstyle
    
    * refactor: simple logic
    
    * bugfix: fix dataset
    
    * bugfix: exclude cycle
    
    * bufix: excclude cycle
    
    * refactor: designate one to calculate the final similarity
    
    * optimize: replace memory relationship to id
    
    * refactor: messge type divide
    
    * bugfix: add random seed
    
    * bugfix: fix windows
    
    * chore: optimize import
    
    * fix checkstyle
    
    ---------
    
    Co-authored-by: undertaker86001 <[email protected]>
---
 .../schema/function/BuildInSqlFunctionTable.java   |   2 +
 .../geaflow/dsl/udf/graph/JaccardSimilarity.java   | 196 +++++++++++++++++++++
 .../dsl/runtime/query/GQLAlgorithmTest.java        |  10 ++
 .../geaflow/dsl/runtime/query/IncrMatchTest.java   |  61 +++++--
 .../expect/gql_algorithm_jaccard_similarity.txt    |   1 +
 .../query/gql_algorithm_jaccard_similarity.sql     |  34 ++++
 6 files changed, 286 insertions(+), 18 deletions(-)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
index 466389a97..06aca5850 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
@@ -41,6 +41,7 @@ import org.apache.geaflow.dsl.udf.graph.IncKHopAlgorithm;
 import org.apache.geaflow.dsl.udf.graph.IncMinimumSpanningTree;
 import org.apache.geaflow.dsl.udf.graph.IncWeakConnectedComponents;
 import org.apache.geaflow.dsl.udf.graph.IncrementalKCore;
+import org.apache.geaflow.dsl.udf.graph.JaccardSimilarity;
 import org.apache.geaflow.dsl.udf.graph.KCore;
 import org.apache.geaflow.dsl.udf.graph.KHop;
 import org.apache.geaflow.dsl.udf.graph.PageRank;
@@ -218,6 +219,7 @@ public class BuildInSqlFunctionTable extends 
ListSqlOperatorTable {
             .add(GeaFlowFunction.of(TriangleCount.class))
             .add(GeaFlowFunction.of(IncWeakConnectedComponents.class))
             .add(GeaFlowFunction.of(CommonNeighbors.class))
+            .add(GeaFlowFunction.of(JaccardSimilarity.class))
             .add(GeaFlowFunction.of(IncKHopAlgorithm.class))
             .build();
 
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/JaccardSimilarity.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/JaccardSimilarity.java
new file mode 100644
index 000000000..e0516f85d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/JaccardSimilarity.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.geaflow.common.tuple.Tuple;
+import org.apache.geaflow.common.type.primitive.DoubleType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.util.TypeCastUtil;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+
+@Description(name = "jaccard_similarity", description = "built-in udga for 
Jaccard Similarity")
+public class JaccardSimilarity implements AlgorithmUserFunction<Object, 
ObjectRow> {
+
+    private AlgorithmRuntimeContext<Object, ObjectRow> context;
+
+    // tuple to store params
+    private Tuple<Object, Object> vertices;
+
+    @Override
+    public void init(AlgorithmRuntimeContext<Object, ObjectRow> context, 
Object[] params) {
+        this.context = context;
+
+        if (params.length != 2) {
+            throw new IllegalArgumentException("Only support two arguments, 
usage: jaccard_similarity(id_a, id_b)");
+        }
+        this.vertices = new Tuple<>(
+            TypeCastUtil.cast(params[0], context.getGraphSchema().getIdType()),
+            TypeCastUtil.cast(params[1], context.getGraphSchema().getIdType())
+        );
+    }
+
+    @Override
+    public void process(RowVertex vertex, Optional<Row> updatedValues, 
Iterator<ObjectRow> messages) {
+        if (context.getCurrentIterationId() == 1L) {
+            // First iteration: vertices A and B compute their neighbor counts
+            if (vertices.f0.equals(vertex.getId()) || 
vertices.f1.equals(vertex.getId())) {
+                List<RowEdge> edges = context.loadEdges(EdgeDirection.BOTH);
+                Object sourceId = vertex.getId();
+                
+                // Calculate unique neighbors count (de-duplicate and exclude 
self-loops)
+                Set<Object> uniqueNeighbors = new HashSet<>();
+                for (RowEdge edge : edges) {
+                    Object targetId = edge.getTargetId();
+                    // Exclude self-loops: only add if targetId != sourceId
+                    if (!sourceId.equals(targetId)) {
+                        uniqueNeighbors.add(targetId);
+                    }
+                }
+                
+                // Calculate neighbor count for this vertex
+                long neighborCount = uniqueNeighbors.size();
+                
+                // Send messages to all unique neighbors
+                // Message format: [sourceId, neighborCount, messageType]
+                // messageType = 0: neighbor inquiry, messageType = 1: count 
from target vertex
+                for (Object neighbor : uniqueNeighbors) {
+                    context.sendMessage(neighbor, ObjectRow.create(sourceId, 
neighborCount, 0L));
+                }
+                
+                // Send neighbor count to the other target vertex (A ↔ B 
exchange)
+                // Message format: [vertexId, neighborCount, messageType]
+                // messageType = 1: this is a count message from target vertex 
B
+                if (vertices.f0.equals(sourceId) && 
!vertices.f0.equals(vertices.f1)) {
+                    context.sendMessage(vertices.f1, 
ObjectRow.create(sourceId, neighborCount, 1L));
+                } else if (vertices.f1.equals(sourceId) && 
!vertices.f0.equals(vertices.f1)) {
+                    context.sendMessage(vertices.f0, 
ObjectRow.create(sourceId, neighborCount, 1L));
+                }
+            }
+        } else if (context.getCurrentIterationId() == 2L) {
+            // Second iteration: calculate Jaccard similarity
+            if (vertices.f0.equals(vertex.getId()) || 
vertices.f1.equals(vertex.getId())) {
+                // Extract neighbor counts and count common neighbors
+                long neighborCountA = 0;
+                long neighborCountB = 0;
+                long localCommonNeighborCount = 0;
+                
+                while (messages.hasNext()) {
+                    ObjectRow message = messages.next();
+                    Object senderId = message.getField(0, 
context.getGraphSchema().getIdType());
+                    long count = (Long) message.getField(1, 
org.apache.geaflow.common.type.primitive.LongType.INSTANCE);
+                    long messageType = (Long) message.getField(2, 
org.apache.geaflow.common.type.primitive.LongType.INSTANCE);
+                    
+                    // messageType = 1: neighbor count from the other target 
vertex (A or B)
+                    // messageType = 0: confirmation from common neighbor
+                    if (messageType == 1L) {
+                        // This is a count message from target vertex
+                        if (vertices.f0.equals(senderId)) {
+                            neighborCountA = count;
+                        } else if (vertices.f1.equals(senderId)) {
+                            neighborCountB = count;
+                        }
+                    } else {
+                        // This is a confirmation from a common neighbor
+                        localCommonNeighborCount++;
+                    }
+                }
+                
+                // Calculate and output the Jaccard coefficient only from 
vertex A
+                if (vertices.f0.equals(vertex.getId())) {
+                    // If neighborCountA is 0, calculate it from edges
+                    if (neighborCountA == 0) {
+                        Object sourceId = vertex.getId();
+                        List<RowEdge> edges = 
context.loadEdges(EdgeDirection.BOTH);
+                        Set<Object> neighbors = new HashSet<>();
+                        for (RowEdge edge : edges) {
+                            Object targetId = edge.getTargetId();
+                            if (!sourceId.equals(targetId)) {
+                                neighbors.add(targetId);
+                            }
+                        }
+                        neighborCountA = neighbors.size();
+                    }
+                    
+                    // Calculate Jaccard coefficient: |A ∩ B| / |A ∪ B|
+                    long intersection = localCommonNeighborCount;
+                    long union = neighborCountA + neighborCountB - 
intersection;
+                    double jaccardCoefficient = union == 0 ? 0.0 : (double) 
intersection / union;
+                    
+                    // Output the result
+                    context.take(ObjectRow.create(vertices.f0, vertices.f1, 
jaccardCoefficient));
+                }
+            } else {
+                // For non-A, non-B vertices: check if they received messages 
from both A and B
+                boolean receivedFromA = false;
+                boolean receivedFromB = false;
+                
+                while (messages.hasNext()) {
+                    ObjectRow message = messages.next();
+                    Object senderId = message.getField(0, 
context.getGraphSchema().getIdType());
+                    long messageType = (Long) message.getField(2, 
org.apache.geaflow.common.type.primitive.LongType.INSTANCE);
+                    
+                    // Only count messages with type 0 (neighbor inquiry)
+                    if (messageType == 0L) {
+                        if (vertices.f0.equals(senderId)) {
+                            receivedFromA = true;
+                        }
+                        if (vertices.f1.equals(senderId)) {
+                            receivedFromB = true;
+                        }
+                    }
+                }
+                
+                // If this vertex received messages from both A and B, it's a 
common neighbor
+                // Send confirmation to vertex A with format [vertexId, 1, 0]
+                if (receivedFromA && receivedFromB) {
+                    context.sendMessage(vertices.f0, 
ObjectRow.create(vertex.getId(), 1L, 0L));
+                }
+            }
+        }
+    }
+
+    @Override
+    public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+        // No additional finish processing needed
+    }
+
+    @Override
+    public StructType getOutputType(GraphSchema graphSchema) {
+        return new StructType(
+            new TableField("vertex_a", graphSchema.getIdType(), false),
+            new TableField("vertex_b", graphSchema.getIdType(), false),
+            new TableField("jaccard_coefficient", DoubleType.INSTANCE, false)
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
index 7982649f5..2fc903575 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
@@ -258,6 +258,16 @@ public class GQLAlgorithmTest {
             .checkSinkResult();
     }
 
+    @Test
+    public void testAlgorithmJaccardSimilarity() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_algorithm_jaccard_similarity.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
     @Test
     public void testEdgeIterator() throws Exception {
         QueryTester
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
index 527e2b999..bac6b1f93 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncrMatchTest.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.geaflow.common.config.keys.DSLConfigKeys;
 import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
 import org.testng.Assert;
@@ -65,11 +64,26 @@ public class IncrMatchTest {
 
         String allPath = getTargetPath(queryPath);
         List<Set<String>> allRes = readRes(allPath, false);
-        Assert.assertEquals(incrRes.size(), allRes.size());
-
-        // the last is empty iteration, ignore.
-        for (int i = 0; i < incrRes.size() - 1; i++) {
-            Assert.assertEquals(allRes.get(i), incrRes.get(i));
+        
+        // Ensure both results have the same number of windows
+        Assert.assertEquals(incrRes.size(), allRes.size(), 
+            "Incremental and full traversal should have same number of 
windows");
+        
+        // For incremental traversal, each window contains cumulative results 
(all results from window 0 to current)
+        // For full traversal, each window contains only results from that 
specific window
+        // So we need to compare: incremental[i] should equal union of full[0] 
to full[i]
+        Set<String> cumulativeFull = new HashSet<>();
+        for (int i = 0; i < incrRes.size(); i++) {
+            Set<String> incrSet = incrRes.get(i);
+            Set<String> fullSet = allRes.get(i);
+            
+            // Add current window's results to cumulative set
+            cumulativeFull.addAll(fullSet);
+            
+            // Compare incremental result (cumulative) with cumulative full 
result
+            Assert.assertEquals(incrSet, cumulativeFull, 
+                String.format("Window %d mismatch: incremental 
(cumulative)=%s, full (cumulative)=%s", 
+                    i, incrSet, cumulativeFull));
         }
     }
 
@@ -103,12 +117,13 @@ public class IncrMatchTest {
         File edgeFile = new File("/tmp/geaflow-test/incr_modern_edge.txt");
         Set<Tuple2<Integer, Integer>> edges = new HashSet<>();
 
-        Random r = new Random();
+        // Use fixed seed for deterministic test results to avoid flaky tests
+        Random r = new Random(42L);
         while (edges.size() < edgeNum) {
-            int src = RandomUtils.nextInt(1, vertexNum + 1);
-            int dst = RandomUtils.nextInt(1, vertexNum + 1);
+            int src = r.nextInt(vertexNum) + 1;  // 1 to vertexNum
+            int dst = r.nextInt(vertexNum) + 1;  // 1 to vertexNum
             while (src == dst) {
-                dst = RandomUtils.nextInt(1, vertexNum + 1);
+                dst = r.nextInt(vertexNum) + 1;
             }
             edges.add(new Tuple2<>(src, dst));
         }
@@ -161,27 +176,37 @@ public class IncrMatchTest {
             String currentLine;
             while ((currentLine = reader.readLine()) != null) {
                 if (currentLine.equals(lineSplit)) {
+                    // Process window separator
                     if (curWindow.isEmpty()) {
+                        // Empty window: for incremental, keep history; for 
full, use empty
                         if (isIncr) {
                             res.add(new HashSet<>(allHistoryRes));
-                            continue;
                         } else {
                             res.add(new HashSet<>());
-                            continue;
                         }
-                    }
-
-                    if (isIncr) {
-                        allHistoryRes.addAll(curWindow);
-                        res.add(new HashSet<>(allHistoryRes));
                     } else {
-                        res.add(new HashSet<>(curWindow));
+                        // Non-empty window
+                        if (isIncr) {
+                            allHistoryRes.addAll(curWindow);
+                            res.add(new HashSet<>(allHistoryRes));
+                        } else {
+                            res.add(new HashSet<>(curWindow));
+                        }
                     }
                     curWindow = new HashSet<>();
                 } else {
                     curWindow.add(currentLine);
                 }
             }
+            // Handle last window if file doesn't end with separator
+            if (!curWindow.isEmpty()) {
+                if (isIncr) {
+                    allHistoryRes.addAll(curWindow);
+                    res.add(new HashSet<>(allHistoryRes));
+                } else {
+                    res.add(new HashSet<>(curWindow));
+                }
+            }
         } catch (IOException e) {
             e.printStackTrace();
         } finally {
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_jaccard_similarity.txt
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_jaccard_similarity.txt
new file mode 100644
index 000000000..249576b58
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_jaccard_similarity.txt
@@ -0,0 +1 @@
+1,3,0.2
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_jaccard_similarity.sql
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_jaccard_similarity.sql
new file mode 100644
index 000000000..a72cb388c
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_jaccard_similarity.sql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE result_tb (
+   vertex_a int,
+   vertex_b int,
+   jaccard_coefficient double
+) WITH (
+      type='file',
+      geaflow.dsl.file.path='${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO result_tb
+CALL jaccard_similarity(1, 3) YIELD (vertex_a, vertex_b, jaccard_coefficient)
+RETURN cast(vertex_a as int), cast(vertex_b as int), jaccard_coefficient
+;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to