This is an automated email from the ASF dual-hosted git repository.

loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new ab533347f feat: Implements lpa and cc algorithm (#670)
ab533347f is described below

commit ab533347f63e68babe3ecb939faacb0ee75f5e35
Author: moses <[email protected]>
AuthorDate: Tue Dec 2 09:32:26 2025 +0800

    feat: Implements lpa and cc algorithm (#670)
    
    * feat: Implements lpa and cc algorithm
    
    * fix
    
    * fix cc
    
    * clarify parameter name
    
    * improve cc
---
 .../schema/function/BuildInSqlFunctionTable.java   |   4 +
 .../geaflow/dsl/udf/graph/ConnectedComponents.java | 107 ++++++++++++++++++
 .../geaflow/dsl/udf/graph/LabelPropagation.java    | 121 +++++++++++++++++++++
 .../dsl/runtime/query/GQLAlgorithmTest.java        |  18 +++
 .../geaflow/dsl/runtime/query/QueryTester.java     |   4 +-
 .../src/test/resources/data/lpa_edges.txt          |  10 ++
 .../src/test/resources/data/lpa_vertex.txt         |  11 ++
 .../src/test/resources/expect/gql_algorithm_cc.txt |  11 ++
 .../test/resources/expect/gql_algorithm_lpa.txt    |  11 ++
 .../src/test/resources/query/gql_algorithm_cc.sql  |  78 +++++++++++++
 .../src/test/resources/query/gql_algorithm_lpa.sql |  78 +++++++++++++
 11 files changed, 451 insertions(+), 2 deletions(-)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
index 185c4bfac..7c3d72fb9 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
@@ -38,6 +38,7 @@ import org.apache.geaflow.dsl.udf.graph.AllSourceShortestPath;
 import org.apache.geaflow.dsl.udf.graph.ClosenessCentrality;
 import org.apache.geaflow.dsl.udf.graph.ClusterCoefficient;
 import org.apache.geaflow.dsl.udf.graph.CommonNeighbors;
+import org.apache.geaflow.dsl.udf.graph.ConnectedComponents;
 import org.apache.geaflow.dsl.udf.graph.IncKHopAlgorithm;
 import org.apache.geaflow.dsl.udf.graph.IncMinimumSpanningTree;
 import org.apache.geaflow.dsl.udf.graph.IncWeakConnectedComponents;
@@ -45,6 +46,7 @@ import org.apache.geaflow.dsl.udf.graph.IncrementalKCore;
 import org.apache.geaflow.dsl.udf.graph.JaccardSimilarity;
 import org.apache.geaflow.dsl.udf.graph.KCore;
 import org.apache.geaflow.dsl.udf.graph.KHop;
+import org.apache.geaflow.dsl.udf.graph.LabelPropagation;
 import org.apache.geaflow.dsl.udf.graph.PageRank;
 import org.apache.geaflow.dsl.udf.graph.SingleSourceShortestPath;
 import org.apache.geaflow.dsl.udf.graph.TriangleCount;
@@ -223,6 +225,8 @@ public class BuildInSqlFunctionTable extends 
ListSqlOperatorTable {
             .add(GeaFlowFunction.of(CommonNeighbors.class))
             .add(GeaFlowFunction.of(JaccardSimilarity.class))
             .add(GeaFlowFunction.of(IncKHopAlgorithm.class))
+            .add(GeaFlowFunction.of(LabelPropagation.class))
+            .add(GeaFlowFunction.of(ConnectedComponents.class))
             .build();
 
     public BuildInSqlFunctionTable(GQLJavaTypeFactory typeFactory) {
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ConnectedComponents.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ConnectedComponents.java
new file mode 100644
index 000000000..5c0d8f95b
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ConnectedComponents.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.geaflow.common.type.primitive.StringType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+
+@Description(name = "cc", description = "built-in udga for Connected 
Components Algorithm")
+public class ConnectedComponents implements AlgorithmUserFunction<Object, 
String> {
+
+    private AlgorithmRuntimeContext<Object, String> context;
+    private String outputKeyName = "component";
+    private int iteration = 20;
+
+    @Override
+    public void init(AlgorithmRuntimeContext<Object, String> context, Object[] 
parameters) {
+        this.context = context;
+        if (parameters.length > 2) {
+            throw new IllegalArgumentException(
+                "Only support zero or more arguments, false arguments "
+                    + "usage: func([iteration, [outputKeyName]])");
+        }
+        if (parameters.length > 0) {
+            iteration = Integer.parseInt(String.valueOf(parameters[0]));
+        }
+        if (parameters.length > 1) {
+            outputKeyName = String.valueOf(parameters[1]);
+        }
+    }
+
+    @Override
+    public void process(RowVertex vertex, Optional<Row> updatedValues, 
Iterator<String> messages) {
+        updatedValues.ifPresent(vertex::setValue);
+        Stream<RowEdge> stream = context.loadEdges(EdgeDirection.IN).stream();
+        if (context.getCurrentIterationId() == 1L) {
+            String initValue = String.valueOf(vertex.getId());
+            sendMessageToNeighbors(stream, initValue);
+            context.sendMessage(vertex.getId(), initValue);
+            context.updateVertexValue(ObjectRow.create(initValue));
+        } else if (context.getCurrentIterationId() < iteration) {
+            String minComponent = null;
+            while (messages.hasNext()) {
+                String next = messages.next();
+                if (minComponent == null || next.compareTo(minComponent) < 0) {
+                    minComponent = next;
+                }
+            }
+
+            String currentValue = (String) vertex.getValue().getField(0, 
StringType.INSTANCE);
+            // If found smaller component id, update and propagate
+            if (minComponent != null && minComponent.compareTo(currentValue) < 
0) {
+                sendMessageToNeighbors(stream, minComponent);
+                context.sendMessage(vertex.getId(), minComponent);
+                context.updateVertexValue(ObjectRow.create(minComponent));
+            }
+        }
+    }
+
+    @Override
+    public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+        updatedValues.ifPresent(graphVertex::setValue);
+        String component = (String) graphVertex.getValue().getField(0, 
StringType.INSTANCE);
+        context.take(ObjectRow.create(graphVertex.getId(), component));
+    }
+
+    @Override
+    public StructType getOutputType(GraphSchema graphSchema) {
+        return new StructType(
+                new TableField("id", graphSchema.getIdType(), false),
+                new TableField(outputKeyName, StringType.INSTANCE, false)
+        );
+    }
+
+    private void sendMessageToNeighbors(Stream<RowEdge> edges, String message) 
{
+        edges.forEach(rowEdge -> context.sendMessage(rowEdge.getTargetId(), 
message));
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/LabelPropagation.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/LabelPropagation.java
new file mode 100644
index 000000000..33bd5be65
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/LabelPropagation.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.geaflow.common.type.primitive.StringType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+
+@Description(name = "lpa", description = "built-in udga for Label Propagation 
Algorithm")
+public class LabelPropagation implements AlgorithmUserFunction<Object, String> 
{
+
+    private AlgorithmRuntimeContext<Object, String> context;
+    private String outputKeyName = "label";
+    private int iteration = 1000;
+
+    @Override
+    public void init(AlgorithmRuntimeContext<Object, String> context, Object[] 
parameters) {
+        this.context = context;
+        if (parameters.length > 2) {
+            throw new IllegalArgumentException(
+                "Only support zero or more arguments, false arguments "
+                    + "usage: func([iteration, [outputKeyName]])");
+        }
+        if (parameters.length > 0) {
+            iteration = Integer.parseInt(String.valueOf(parameters[0]));
+        }
+        if (parameters.length > 1) {
+            outputKeyName = String.valueOf(parameters[1]);
+        }
+    }
+
+    @Override
+    public void process(RowVertex vertex, Optional<Row> updatedValues, 
Iterator<String> messages) {
+        updatedValues.ifPresent(vertex::setValue);
+        List<RowEdge> edges = new 
ArrayList<>(context.loadEdges(EdgeDirection.BOTH));
+
+        if (context.getCurrentIterationId() == 1L) {
+            String initLabel = String.valueOf(vertex.getId());
+            context.updateVertexValue(ObjectRow.create(initLabel));
+            sendMessageToNeighbors(edges, initLabel);
+        } else if (context.getCurrentIterationId() < iteration) {
+            Map<String, Integer> labelCounts = new HashMap<>();
+            String currentLabel = (String) vertex.getValue().getField(0, 
StringType.INSTANCE);
+
+            while (messages.hasNext()) {
+                String label = messages.next();
+                labelCounts.put(label, labelCounts.getOrDefault(label, 0) + 1);
+            }
+
+            String mostFrequentLabel = currentLabel;
+            int maxCount = 0;
+
+            for (Map.Entry<String, Integer> entry : labelCounts.entrySet()) {
+                String label = entry.getKey();
+                int count = entry.getValue();
+                if (count >= maxCount && label.compareTo(mostFrequentLabel) < 
0) {
+                    mostFrequentLabel = label;
+                    maxCount = count;
+                }
+            }
+
+            if (!mostFrequentLabel.equals(currentLabel)) {
+                context.updateVertexValue(ObjectRow.create(mostFrequentLabel));
+                sendMessageToNeighbors(edges, mostFrequentLabel);
+            }
+        }
+    }
+
+    @Override
+    public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+        updatedValues.ifPresent(graphVertex::setValue);
+        String label = (String) graphVertex.getValue().getField(0, 
StringType.INSTANCE);
+        context.take(ObjectRow.create(graphVertex.getId(), label));
+    }
+
+    @Override
+    public StructType getOutputType(GraphSchema graphSchema) {
+        return new StructType(
+            new TableField("id", graphSchema.getIdType(), false),
+            new TableField(outputKeyName, StringType.INSTANCE, false)
+        );
+    }
+
+    private void sendMessageToNeighbors(List<RowEdge> edges, String message) {
+        for (RowEdge rowEdge : edges) {
+            context.sendMessage(rowEdge.getTargetId(), message);
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
index 0ad33935f..04c62509f 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
@@ -181,6 +181,24 @@ public class GQLAlgorithmTest {
             .checkSinkResult();
     }
 
+    @Test
+    public void testAlgorithmLabelPropagation() throws Exception {
+        QueryTester
+            .build()
+            .withQueryPath("/query/gql_algorithm_lpa.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testAlgorithmConnectedComponents() throws Exception {
+        QueryTester
+            .build()
+            .withQueryPath("/query/gql_algorithm_cc.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
     @Test
     public void testIncGraphAlgorithm_001() throws Exception {
         QueryTester
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
index ccf8754b4..6ddcd691c 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
@@ -193,7 +193,7 @@ public class QueryTester implements Serializable {
 
     private void compareResult(String actualResult, String expectResult) {
         if (compareWithOrder) {
-            Assert.assertEquals(expectResult, actualResult);
+            Assert.assertEquals(actualResult, expectResult);
         } else {
             String[] actualLines = actualResult.split("\n");
             String[] expectLines = expectResult.split("\n");
@@ -209,7 +209,7 @@ public class QueryTester implements Serializable {
             String actualSort = StringUtils.join(actualLines, "\n");
             String expectSort = StringUtils.join(expectLines, "\n");
             if (!Objects.equals(actualSort, expectSort)) {
-                Assert.assertEquals(expectResult, actualResult);
+                Assert.assertEquals(actualResult, expectResult);
             }
         }
     }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_edges.txt 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_edges.txt
new file mode 100644
index 000000000..67efcd94d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_edges.txt
@@ -0,0 +1,10 @@
+1,2
+1,3
+2,3
+4,5
+4,6
+4,7
+8,9
+9,10
+10,9
+9,8
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_vertex.txt
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_vertex.txt
new file mode 100644
index 000000000..a7904c0f1
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_vertex.txt
@@ -0,0 +1,11 @@
+1,1, ,
+2,1, ,
+3,1, ,
+4,1, ,
+5,1, ,
+6,1, ,
+7,1, ,
+8,1, ,
+9,1, ,
+10,1, ,
+11,1, ,
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_cc.txt
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_cc.txt
new file mode 100644
index 000000000..7e9ed5c11
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_cc.txt
@@ -0,0 +1,11 @@
+1,1
+5,5
+9,10
+3,3
+4,4
+2,2
+10,10
+11,11
+7,7
+6,6
+8,10
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_lpa.txt
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_lpa.txt
new file mode 100644
index 000000000..2e325c434
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_lpa.txt
@@ -0,0 +1,11 @@
+1,1
+5,4
+9,10
+3,1
+4,4
+2,1
+10,10
+11,11
+7,4
+6,4
+8,10
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cc.sql
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cc.sql
new file mode 100644
index 000000000..660f3256b
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cc.sql
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = -1;
+set geaflow.dsl.ignore.exception = true;
+
+CREATE GRAPH IF NOT EXISTS g5 (
+  Vertex v5 (
+    vid varchar ID,
+    vvalue int
+  ),
+  Edge e5 (
+    srcId varchar SOURCE ID,
+    targetId varchar DESTINATION ID
+  )
+) WITH (
+  storeType='rocksdb',
+  shardCount = 1
+);
+
+CREATE TABLE IF NOT EXISTS v_source (
+    v_id varchar,
+    v_value int,
+    ts varchar,
+    type varchar
+) WITH (
+  type='file',
+  geaflow.dsl.file.path = 'resource:///data/lpa_vertex.txt'
+);
+
+CREATE TABLE IF NOT EXISTS e_source (
+    src_id varchar,
+    dst_id varchar
+) WITH (
+  type='file',
+  geaflow.dsl.file.path = 'resource:///data/lpa_edges.txt'
+);
+
+CREATE TABLE IF NOT EXISTS tbl_result (
+  v_id varchar,
+  k_value varchar
+) WITH (
+  type='file',
+  geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH g5;
+
+INSERT INTO g5.v5(vid, vvalue)
+SELECT
+v_id, v_value
+FROM v_source;
+
+INSERT INTO g5.e5(srcId, targetId)
+SELECT
+ src_id, dst_id
+FROM e_source;
+
+INSERT INTO tbl_result(v_id, k_value)
+CALL cc() YIELD (vid, kValue)
+RETURN vid, kValue
+;
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_lpa.sql
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_lpa.sql
new file mode 100644
index 000000000..2d750b185
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_lpa.sql
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = -1;
+set geaflow.dsl.ignore.exception = true;
+
+CREATE GRAPH IF NOT EXISTS g5 (
+  Vertex v5 (
+    vid varchar ID,
+    vvalue int
+  ),
+  Edge e5 (
+    srcId varchar SOURCE ID,
+    targetId varchar DESTINATION ID
+  )
+) WITH (
+  storeType='rocksdb',
+  shardCount = 1
+);
+
+CREATE TABLE IF NOT EXISTS v_source (
+    v_id varchar,
+    v_value int,
+    ts varchar,
+    type varchar
+) WITH (
+  type='file',
+  geaflow.dsl.file.path = 'resource:///data/lpa_vertex.txt'
+);
+
+CREATE TABLE IF NOT EXISTS e_source (
+    src_id varchar,
+    dst_id varchar
+) WITH (
+  type='file',
+  geaflow.dsl.file.path = 'resource:///data/lpa_edges.txt'
+);
+
+CREATE TABLE IF NOT EXISTS tbl_result (
+  v_id varchar,
+  k_value varchar
+) WITH (
+  type='file',
+  geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH g5;
+
+INSERT INTO g5.v5(vid, vvalue)
+SELECT
+v_id, v_value
+FROM v_source;
+
+INSERT INTO g5.e5(srcId, targetId)
+SELECT
+ src_id, dst_id
+FROM e_source;
+
+INSERT INTO tbl_result(v_id, k_value)
+CALL lpa() YIELD (vid, kValue)
+RETURN vid, kValue
+;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to