This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new ab533347f feat: Implements lpa and cc algorithm (#670)
ab533347f is described below
commit ab533347f63e68babe3ecb939faacb0ee75f5e35
Author: moses <[email protected]>
AuthorDate: Tue Dec 2 09:32:26 2025 +0800
feat: Implements lpa and cc algorithm (#670)
* feat: Implements lpa and cc algorithm
* fix
* fix cc
* clarify parameter name
* improve cc
---
.../schema/function/BuildInSqlFunctionTable.java | 4 +
.../geaflow/dsl/udf/graph/ConnectedComponents.java | 107 ++++++++++++++++++
.../geaflow/dsl/udf/graph/LabelPropagation.java | 121 +++++++++++++++++++++
.../dsl/runtime/query/GQLAlgorithmTest.java | 18 +++
.../geaflow/dsl/runtime/query/QueryTester.java | 4 +-
.../src/test/resources/data/lpa_edges.txt | 10 ++
.../src/test/resources/data/lpa_vertex.txt | 11 ++
.../src/test/resources/expect/gql_algorithm_cc.txt | 11 ++
.../test/resources/expect/gql_algorithm_lpa.txt | 11 ++
.../src/test/resources/query/gql_algorithm_cc.sql | 78 +++++++++++++
.../src/test/resources/query/gql_algorithm_lpa.sql | 78 +++++++++++++
11 files changed, 451 insertions(+), 2 deletions(-)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
index 185c4bfac..7c3d72fb9 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
@@ -38,6 +38,7 @@ import org.apache.geaflow.dsl.udf.graph.AllSourceShortestPath;
import org.apache.geaflow.dsl.udf.graph.ClosenessCentrality;
import org.apache.geaflow.dsl.udf.graph.ClusterCoefficient;
import org.apache.geaflow.dsl.udf.graph.CommonNeighbors;
+import org.apache.geaflow.dsl.udf.graph.ConnectedComponents;
import org.apache.geaflow.dsl.udf.graph.IncKHopAlgorithm;
import org.apache.geaflow.dsl.udf.graph.IncMinimumSpanningTree;
import org.apache.geaflow.dsl.udf.graph.IncWeakConnectedComponents;
@@ -45,6 +46,7 @@ import org.apache.geaflow.dsl.udf.graph.IncrementalKCore;
import org.apache.geaflow.dsl.udf.graph.JaccardSimilarity;
import org.apache.geaflow.dsl.udf.graph.KCore;
import org.apache.geaflow.dsl.udf.graph.KHop;
+import org.apache.geaflow.dsl.udf.graph.LabelPropagation;
import org.apache.geaflow.dsl.udf.graph.PageRank;
import org.apache.geaflow.dsl.udf.graph.SingleSourceShortestPath;
import org.apache.geaflow.dsl.udf.graph.TriangleCount;
@@ -223,6 +225,8 @@ public class BuildInSqlFunctionTable extends
ListSqlOperatorTable {
.add(GeaFlowFunction.of(CommonNeighbors.class))
.add(GeaFlowFunction.of(JaccardSimilarity.class))
.add(GeaFlowFunction.of(IncKHopAlgorithm.class))
+ .add(GeaFlowFunction.of(LabelPropagation.class))
+ .add(GeaFlowFunction.of(ConnectedComponents.class))
.build();
public BuildInSqlFunctionTable(GQLJavaTypeFactory typeFactory) {
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ConnectedComponents.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ConnectedComponents.java
new file mode 100644
index 000000000..5c0d8f95b
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ConnectedComponents.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.geaflow.common.type.primitive.StringType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+
+@Description(name = "cc", description = "built-in udga for Connected
Components Algorithm")
+public class ConnectedComponents implements AlgorithmUserFunction<Object,
String> {
+
+ private AlgorithmRuntimeContext<Object, String> context;
+ private String outputKeyName = "component";
+ private int iteration = 20;
+
+ @Override
+ public void init(AlgorithmRuntimeContext<Object, String> context, Object[]
parameters) {
+ this.context = context;
+ if (parameters.length > 2) {
+ throw new IllegalArgumentException(
+ "Only support zero or more arguments, false arguments "
+ + "usage: func([iteration, [outputKeyName]])");
+ }
+ if (parameters.length > 0) {
+ iteration = Integer.parseInt(String.valueOf(parameters[0]));
+ }
+ if (parameters.length > 1) {
+ outputKeyName = String.valueOf(parameters[1]);
+ }
+ }
+
+ @Override
+ public void process(RowVertex vertex, Optional<Row> updatedValues,
Iterator<String> messages) {
+ updatedValues.ifPresent(vertex::setValue);
+ Stream<RowEdge> stream = context.loadEdges(EdgeDirection.IN).stream();
+ if (context.getCurrentIterationId() == 1L) {
+ String initValue = String.valueOf(vertex.getId());
+ sendMessageToNeighbors(stream, initValue);
+ context.sendMessage(vertex.getId(), initValue);
+ context.updateVertexValue(ObjectRow.create(initValue));
+ } else if (context.getCurrentIterationId() < iteration) {
+ String minComponent = null;
+ while (messages.hasNext()) {
+ String next = messages.next();
+ if (minComponent == null || next.compareTo(minComponent) < 0) {
+ minComponent = next;
+ }
+ }
+
+ String currentValue = (String) vertex.getValue().getField(0,
StringType.INSTANCE);
+ // If found smaller component id, update and propagate
+ if (minComponent != null && minComponent.compareTo(currentValue) <
0) {
+ sendMessageToNeighbors(stream, minComponent);
+ context.sendMessage(vertex.getId(), minComponent);
+ context.updateVertexValue(ObjectRow.create(minComponent));
+ }
+ }
+ }
+
+ @Override
+ public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+ updatedValues.ifPresent(graphVertex::setValue);
+ String component = (String) graphVertex.getValue().getField(0,
StringType.INSTANCE);
+ context.take(ObjectRow.create(graphVertex.getId(), component));
+ }
+
+ @Override
+ public StructType getOutputType(GraphSchema graphSchema) {
+ return new StructType(
+ new TableField("id", graphSchema.getIdType(), false),
+ new TableField(outputKeyName, StringType.INSTANCE, false)
+ );
+ }
+
+ private void sendMessageToNeighbors(Stream<RowEdge> edges, String message)
{
+ edges.forEach(rowEdge -> context.sendMessage(rowEdge.getTargetId(),
message));
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/LabelPropagation.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/LabelPropagation.java
new file mode 100644
index 000000000..33bd5be65
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/LabelPropagation.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.geaflow.common.type.primitive.StringType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+
+@Description(name = "lpa", description = "built-in udga for Label Propagation
Algorithm")
+public class LabelPropagation implements AlgorithmUserFunction<Object, String>
{
+
+ private AlgorithmRuntimeContext<Object, String> context;
+ private String outputKeyName = "label";
+ private int iteration = 1000;
+
+ @Override
+ public void init(AlgorithmRuntimeContext<Object, String> context, Object[]
parameters) {
+ this.context = context;
+ if (parameters.length > 2) {
+ throw new IllegalArgumentException(
+ "Only support zero or more arguments, false arguments "
+ + "usage: func([iteration, [outputKeyName]])");
+ }
+ if (parameters.length > 0) {
+ iteration = Integer.parseInt(String.valueOf(parameters[0]));
+ }
+ if (parameters.length > 1) {
+ outputKeyName = String.valueOf(parameters[1]);
+ }
+ }
+
+ @Override
+ public void process(RowVertex vertex, Optional<Row> updatedValues,
Iterator<String> messages) {
+ updatedValues.ifPresent(vertex::setValue);
+ List<RowEdge> edges = new
ArrayList<>(context.loadEdges(EdgeDirection.BOTH));
+
+ if (context.getCurrentIterationId() == 1L) {
+ String initLabel = String.valueOf(vertex.getId());
+ context.updateVertexValue(ObjectRow.create(initLabel));
+ sendMessageToNeighbors(edges, initLabel);
+ } else if (context.getCurrentIterationId() < iteration) {
+ Map<String, Integer> labelCounts = new HashMap<>();
+ String currentLabel = (String) vertex.getValue().getField(0,
StringType.INSTANCE);
+
+ while (messages.hasNext()) {
+ String label = messages.next();
+ labelCounts.put(label, labelCounts.getOrDefault(label, 0) + 1);
+ }
+
+ String mostFrequentLabel = currentLabel;
+ int maxCount = 0;
+
+ for (Map.Entry<String, Integer> entry : labelCounts.entrySet()) {
+ String label = entry.getKey();
+ int count = entry.getValue();
+ if (count >= maxCount && label.compareTo(mostFrequentLabel) <
0) {
+ mostFrequentLabel = label;
+ maxCount = count;
+ }
+ }
+
+ if (!mostFrequentLabel.equals(currentLabel)) {
+ context.updateVertexValue(ObjectRow.create(mostFrequentLabel));
+ sendMessageToNeighbors(edges, mostFrequentLabel);
+ }
+ }
+ }
+
+ @Override
+ public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+ updatedValues.ifPresent(graphVertex::setValue);
+ String label = (String) graphVertex.getValue().getField(0,
StringType.INSTANCE);
+ context.take(ObjectRow.create(graphVertex.getId(), label));
+ }
+
+ @Override
+ public StructType getOutputType(GraphSchema graphSchema) {
+ return new StructType(
+ new TableField("id", graphSchema.getIdType(), false),
+ new TableField(outputKeyName, StringType.INSTANCE, false)
+ );
+ }
+
+ private void sendMessageToNeighbors(List<RowEdge> edges, String message) {
+ for (RowEdge rowEdge : edges) {
+ context.sendMessage(rowEdge.getTargetId(), message);
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
index 0ad33935f..04c62509f 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLAlgorithmTest.java
@@ -181,6 +181,24 @@ public class GQLAlgorithmTest {
.checkSinkResult();
}
+ @Test
+ public void testAlgorithmLabelPropagation() throws Exception {
+ QueryTester
+ .build()
+ .withQueryPath("/query/gql_algorithm_lpa.sql")
+ .execute()
+ .checkSinkResult();
+ }
+
+ @Test
+ public void testAlgorithmConnectedComponents() throws Exception {
+ QueryTester
+ .build()
+ .withQueryPath("/query/gql_algorithm_cc.sql")
+ .execute()
+ .checkSinkResult();
+ }
+
@Test
public void testIncGraphAlgorithm_001() throws Exception {
QueryTester
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
index ccf8754b4..6ddcd691c 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/QueryTester.java
@@ -193,7 +193,7 @@ public class QueryTester implements Serializable {
private void compareResult(String actualResult, String expectResult) {
if (compareWithOrder) {
- Assert.assertEquals(expectResult, actualResult);
+ Assert.assertEquals(actualResult, expectResult);
} else {
String[] actualLines = actualResult.split("\n");
String[] expectLines = expectResult.split("\n");
@@ -209,7 +209,7 @@ public class QueryTester implements Serializable {
String actualSort = StringUtils.join(actualLines, "\n");
String expectSort = StringUtils.join(expectLines, "\n");
if (!Objects.equals(actualSort, expectSort)) {
- Assert.assertEquals(expectResult, actualResult);
+ Assert.assertEquals(actualResult, expectResult);
}
}
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_edges.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_edges.txt
new file mode 100644
index 000000000..67efcd94d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_edges.txt
@@ -0,0 +1,10 @@
+1,2
+1,3
+2,3
+4,5
+4,6
+4,7
+8,9
+9,10
+10,9
+9,8
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_vertex.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_vertex.txt
new file mode 100644
index 000000000..a7904c0f1
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/lpa_vertex.txt
@@ -0,0 +1,11 @@
+1,1, ,
+2,1, ,
+3,1, ,
+4,1, ,
+5,1, ,
+6,1, ,
+7,1, ,
+8,1, ,
+9,1, ,
+10,1, ,
+11,1, ,
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_cc.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_cc.txt
new file mode 100644
index 000000000..7e9ed5c11
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_cc.txt
@@ -0,0 +1,11 @@
+1,1
+5,5
+9,10
+3,3
+4,4
+2,2
+10,10
+11,11
+7,7
+6,6
+8,10
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_lpa.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_lpa.txt
new file mode 100644
index 000000000..2e325c434
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_algorithm_lpa.txt
@@ -0,0 +1,11 @@
+1,1
+5,4
+9,10
+3,1
+4,4
+2,1
+10,10
+11,11
+7,4
+6,4
+8,10
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cc.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cc.sql
new file mode 100644
index 000000000..660f3256b
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cc.sql
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = -1;
+set geaflow.dsl.ignore.exception = true;
+
+CREATE GRAPH IF NOT EXISTS g5 (
+ Vertex v5 (
+ vid varchar ID,
+ vvalue int
+ ),
+ Edge e5 (
+ srcId varchar SOURCE ID,
+ targetId varchar DESTINATION ID
+ )
+) WITH (
+ storeType='rocksdb',
+ shardCount = 1
+);
+
+CREATE TABLE IF NOT EXISTS v_source (
+ v_id varchar,
+ v_value int,
+ ts varchar,
+ type varchar
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = 'resource:///data/lpa_vertex.txt'
+);
+
+CREATE TABLE IF NOT EXISTS e_source (
+ src_id varchar,
+ dst_id varchar
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = 'resource:///data/lpa_edges.txt'
+);
+
+CREATE TABLE IF NOT EXISTS tbl_result (
+ v_id varchar,
+ k_value varchar
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH g5;
+
+INSERT INTO g5.v5(vid, vvalue)
+SELECT
+v_id, v_value
+FROM v_source;
+
+INSERT INTO g5.e5(srcId, targetId)
+SELECT
+ src_id, dst_id
+FROM e_source;
+
+INSERT INTO tbl_result(v_id, k_value)
+CALL cc() YIELD (vid, kValue)
+RETURN vid, kValue
+;
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_lpa.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_lpa.sql
new file mode 100644
index 000000000..2d750b185
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_lpa.sql
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = -1;
+set geaflow.dsl.ignore.exception = true;
+
+CREATE GRAPH IF NOT EXISTS g5 (
+ Vertex v5 (
+ vid varchar ID,
+ vvalue int
+ ),
+ Edge e5 (
+ srcId varchar SOURCE ID,
+ targetId varchar DESTINATION ID
+ )
+) WITH (
+ storeType='rocksdb',
+ shardCount = 1
+);
+
+CREATE TABLE IF NOT EXISTS v_source (
+ v_id varchar,
+ v_value int,
+ ts varchar,
+ type varchar
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = 'resource:///data/lpa_vertex.txt'
+);
+
+CREATE TABLE IF NOT EXISTS e_source (
+ src_id varchar,
+ dst_id varchar
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = 'resource:///data/lpa_edges.txt'
+);
+
+CREATE TABLE IF NOT EXISTS tbl_result (
+ v_id varchar,
+ k_value varchar
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH g5;
+
+INSERT INTO g5.v5(vid, vvalue)
+SELECT
+v_id, v_value
+FROM v_source;
+
+INSERT INTO g5.e5(srcId, targetId)
+SELECT
+ src_id, dst_id
+FROM e_source;
+
+INSERT INTO tbl_result(v_id, k_value)
+CALL lpa() YIELD (vid, kValue)
+RETURN vid, kValue
+;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]