This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch olap-algo
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 790ada9df013f3895b2d11b525fc0259e9e4e161
Author: houzhizhen <[email protected]>
AuthorDate: Fri Jun 5 06:33:01 2020 -0500

    add weak connected component analysis (#21)
---
 .../hugegraph/job/algorithm/AbstractAlgorithm.java |   8 +
 .../hugegraph/job/algorithm/AlgorithmPool.java     |   2 +
 .../job/algorithm/comm/WeakConnectedComponent.java | 221 +++++++++++++++++++++
 3 files changed, 231 insertions(+)

diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
index aab04db39..f94912902 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -415,6 +415,14 @@ public abstract class AbstractAlgorithm implements 
Algorithm {
             });
         }
 
+        protected Vertex vertex(Object id) {
+            Iterator<Vertex> iter = this.graph().vertices(id);
+            if (!iter.hasNext()) {
+                return null;
+            }
+            return iter.next();
+        }
+
         protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
                                           String key, Object value) {
             return new FilterIterator<>(vertices, vertex -> {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
index 5725be61f..7a8e4291a 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
@@ -31,6 +31,7 @@ import com.baidu.hugegraph.job.algorithm.comm.KCoreAlgorithm;
 import com.baidu.hugegraph.job.algorithm.comm.LouvainAlgorithm;
 import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;
 import com.baidu.hugegraph.job.algorithm.comm.TriangleCountAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.WeakConnectedComponent;
 import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
 import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm;
 import 
com.baidu.hugegraph.job.algorithm.similarity.FusiformSimilarityAlgorithm;
@@ -52,6 +53,7 @@ public class AlgorithmPool {
         INSTANCE.register(new ClusterCoeffcientAlgorithm());
         INSTANCE.register(new LpaAlgorithm());
         INSTANCE.register(new LouvainAlgorithm());
+        INSTANCE.register(new WeakConnectedComponent());
 
         INSTANCE.register(new FusiformSimilarityAlgorithm());
         INSTANCE.register(new RingsDetectAlgorithm());
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/WeakConnectedComponent.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/WeakConnectedComponent.java
new file mode 100644
index 000000000..99dee85cd
--- /dev/null
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/WeakConnectedComponent.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.structure.HugeVertex;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.Log;
+import com.google.common.collect.ImmutableMap;
+
+public class WeakConnectedComponent extends AbstractCommAlgorithm {
+
+    protected static final Logger LOG = 
Log.logger(WeakConnectedComponent.class);
+
+    @Override
+    public String name() {
+        return "weak_connected_component";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        times(parameters);
+        directionOutIn(parameters);
+        degree(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.connectedComponent(times(parameters),
+                                                directionOutIn(parameters),
+                                                degree(parameters));
+        } catch (Throwable e) {
+            job.graph().tx().rollback();
+            throw e;
+        }
+    }
+
+    protected static class Traverser extends AlgoTraverser {
+
+        private final Map<Id, Id> vertexComponentMap = new HashMap<>();
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object connectedComponent(int maxTimes,
+                                         Directions direction,
+                                         long degree) {
+            this.initSchema();
+            this.initVertexComponentMap();
+            int times;
+            
+            for (times = 0; times < maxTimes; times++) {
+                long changeCount = 0;
+                Id currentSourceVertexId = null;
+                // the edges are ordered by ownerVertex
+                Iterator<Edge> edges = this.edges(direction);
+                List<Id> adjacentVertices = new ArrayList<>();
+
+                while (edges.hasNext()) {
+                    HugeEdge edge = (HugeEdge) edges.next();
+                    Id sourceVertexId = edge.ownerVertex().id();
+                    Id targetVertexId = edge.otherVertex().id();
+
+                    if (currentSourceVertexId == null) {
+                        currentSourceVertexId = sourceVertexId;
+                        adjacentVertices.add(targetVertexId);
+                    } else if (currentSourceVertexId.equals(sourceVertexId)) {
+                        if (adjacentVertices.size() < degree) {
+                            adjacentVertices.add(targetVertexId);
+                        }
+                    } else {
+                        changeCount += this.findAndSetMinComponent(
+                                       currentSourceVertexId,
+                                       adjacentVertices);
+                        adjacentVertices = new ArrayList<>();
+                        currentSourceVertexId = sourceVertexId;
+                        adjacentVertices.add(targetVertexId);
+                    }
+                }
+                changeCount += this.findAndSetMinComponent(
+                               currentSourceVertexId,
+                               adjacentVertices);
+                LOG.debug("iterationTimes:{}, changeCount:{}",
+                          times, changeCount);
+
+                if (changeCount == 0L) {
+                    break;
+                }
+            }
+
+            int compCount = writeBackValue();
+            return ImmutableMap.of("components", compCount,
+                                   "iteration_times", times,
+                                   "times", maxTimes);
+        }
+
+        private void initSchema() {
+            String cl = C_LABEL;
+            SchemaManager schema = this.graph().schema();
+            schema.propertyKey(cl).asText().ifNotExist().create();
+            for (VertexLabel vl : schema.getVertexLabels()) {
+                schema.vertexLabel(vl.name()).properties(cl)
+                      .nullableKeys(cl).append();
+            }
+        }
+
+        private void initVertexComponentMap() {
+            Iterator<Vertex> vertices = this.vertices();
+            while (vertices.hasNext()) {
+                Id id = ((HugeVertex) vertices.next()).id();
+                this.vertexComponentMap.put(id, id);
+            }
+        }
+
+        /**
+         * process for a vertex and its adjacentVertices
+         * @param sourceVertexId the source vertex
+         * @param adjacentVertices the adjacent vertices attached to source
+         *                         vertex
+         * @return the count of vertex that changed Component
+         */
+        private long findAndSetMinComponent(Id sourceVertexId,
+                                            List<Id> adjacentVertices) {
+            if (!this.vertexComponentMap.containsKey(sourceVertexId)) {
+                return 0L;
+            }
+            Id min = this.findMinComponent(sourceVertexId, adjacentVertices);
+            return this.updateComponentIfNeeded(min,
+                                                sourceVertexId,
+                                                adjacentVertices);
+        }
+
+        private Id findMinComponent(Id sourceVertexId,
+                                    List<Id> adjacentVertices) {
+            Id min = this.vertexComponentMap.get(sourceVertexId);
+            for (Id vertex : adjacentVertices) {
+                Id comp = this.vertexComponentMap.get(vertex);
+                if (comp != null && comp.compareTo(min) < 0) {
+                    min = comp;
+                }
+            }
+            return min;
+        }
+
+        private long updateComponentIfNeeded(Id min,
+                                             Id sourceVertexId,
+                                             List<Id> adjacentVertices) {
+            long changedCount = 0;
+            Id comp = this.vertexComponentMap.get(sourceVertexId);
+            if (comp.compareTo(min) > 0) {
+                this.vertexComponentMap.put(sourceVertexId, min);
+                changedCount++;
+            }
+            for (Id vertex : adjacentVertices) {
+                comp = this.vertexComponentMap.get(vertex);
+                if (comp != null && comp.compareTo(min) > 0) {
+                    this.vertexComponentMap.put(vertex, min);
+                    changedCount++;
+                }
+            }
+            return changedCount;
+        }
+
+        /**
+         * @return the count of components
+         */
+        private int writeBackValue() {
+            Map<Id, Integer> componentIndexMap = new HashMap<>();
+            int index = 0;
+            for (Map.Entry<Id, Id> entry : this.vertexComponentMap.entrySet()) 
{
+                Id comp = entry.getValue();
+                Integer componentIndex = componentIndexMap.get(comp);
+                if (componentIndex == null) {
+                    componentIndex = index;
+                    componentIndexMap.put(comp, componentIndex);
+                    index++;
+                }
+                Vertex vertex = this.vertex(entry.getKey());
+                if (vertex != null) {
+                    vertex.property(C_LABEL, String.valueOf(componentIndex));
+                    this.commitIfNeeded();
+                }
+            }
+            this.graph().tx().commit();
+            return index;
+        }
+    }
+}

Reply via email to