This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch olap-algo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 790ada9df013f3895b2d11b525fc0259e9e4e161 Author: houzhizhen <[email protected]> AuthorDate: Fri Jun 5 06:33:01 2020 -0500 add weak connected component analysis (#21) --- .../hugegraph/job/algorithm/AbstractAlgorithm.java | 8 + .../hugegraph/job/algorithm/AlgorithmPool.java | 2 + .../job/algorithm/comm/WeakConnectedComponent.java | 221 +++++++++++++++++++++ 3 files changed, 231 insertions(+) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index aab04db39..f94912902 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -415,6 +415,14 @@ public abstract class AbstractAlgorithm implements Algorithm { }); } + protected Vertex vertex(Object id) { + Iterator<Vertex> iter = this.graph().vertices(id); + if (!iter.hasNext()) { + return null; + } + return iter.next(); + } + protected Iterator<Vertex> filter(Iterator<Vertex> vertices, String key, Object value) { return new FilterIterator<>(vertices, vertex -> { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java index 5725be61f..7a8e4291a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java @@ -31,6 +31,7 @@ import com.baidu.hugegraph.job.algorithm.comm.KCoreAlgorithm; import com.baidu.hugegraph.job.algorithm.comm.LouvainAlgorithm; import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm; import com.baidu.hugegraph.job.algorithm.comm.TriangleCountAlgorithm; +import com.baidu.hugegraph.job.algorithm.comm.WeakConnectedComponent; import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm; import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm; import com.baidu.hugegraph.job.algorithm.similarity.FusiformSimilarityAlgorithm; @@ -52,6 +53,7 @@ public class AlgorithmPool { INSTANCE.register(new ClusterCoeffcientAlgorithm()); INSTANCE.register(new LpaAlgorithm()); INSTANCE.register(new LouvainAlgorithm()); + INSTANCE.register(new WeakConnectedComponent()); INSTANCE.register(new FusiformSimilarityAlgorithm()); INSTANCE.register(new RingsDetectAlgorithm()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/WeakConnectedComponent.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/WeakConnectedComponent.java new file mode 100644 index 000000000..99dee85cd --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/WeakConnectedComponent.java @@ -0,0 +1,221 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.slf4j.Logger; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.schema.SchemaManager; +import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.structure.HugeEdge; +import com.baidu.hugegraph.structure.HugeVertex; +import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.util.Log; +import com.google.common.collect.ImmutableMap; + +public class WeakConnectedComponent extends AbstractCommAlgorithm { + + protected static final Logger LOG = Log.logger(WeakConnectedComponent.class); + + @Override + public String name() { + return "weak_connected_component"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + times(parameters); + directionOutIn(parameters); + degree(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + try (Traverser traverser = new Traverser(job)) { + return traverser.connectedComponent(times(parameters), + directionOutIn(parameters), + degree(parameters)); + } catch (Throwable e) { + job.graph().tx().rollback(); + throw e; + } + } + + protected static class Traverser extends AlgoTraverser { + + private final Map<Id, Id> vertexComponentMap = new HashMap<>(); + + public Traverser(Job<Object> job) { + super(job); + } + + public Object connectedComponent(int maxTimes, + Directions direction, + long degree) { + this.initSchema(); + this.initVertexComponentMap(); + int times; + + for (times = 0; times < maxTimes; times++) { + long changeCount = 0; + Id currentSourceVertexId = null; + // the edges are ordered by ownerVertex + Iterator<Edge> edges = this.edges(direction); + List<Id> adjacentVertices = new ArrayList<>(); + + while (edges.hasNext()) { + HugeEdge edge = (HugeEdge) edges.next(); + Id sourceVertexId = edge.ownerVertex().id(); + Id targetVertexId = edge.otherVertex().id(); + + if (currentSourceVertexId == null) { + currentSourceVertexId = sourceVertexId; + adjacentVertices.add(targetVertexId); + } else if (currentSourceVertexId.equals(sourceVertexId)) { + if (adjacentVertices.size() < degree) { + adjacentVertices.add(targetVertexId); + } + } else { + changeCount += this.findAndSetMinComponent( + currentSourceVertexId, + adjacentVertices); + adjacentVertices = new ArrayList<>(); + currentSourceVertexId = sourceVertexId; + adjacentVertices.add(targetVertexId); + } + } + changeCount += this.findAndSetMinComponent( + currentSourceVertexId, + adjacentVertices); + LOG.debug("iterationTimes:{}, changeCount:{}", + times, changeCount); + + if (changeCount == 0L) { + break; + } + } + + int compCount = writeBackValue(); + return ImmutableMap.of("components", compCount, + "iteration_times", times, + "times", maxTimes); + } + + private void initSchema() { + String cl = C_LABEL; + SchemaManager schema = this.graph().schema(); + schema.propertyKey(cl).asText().ifNotExist().create(); + for (VertexLabel vl : schema.getVertexLabels()) { + schema.vertexLabel(vl.name()).properties(cl) + .nullableKeys(cl).append(); + } + } + + private void initVertexComponentMap() { + Iterator<Vertex> vertices = this.vertices(); + while (vertices.hasNext()) { + Id id = ((HugeVertex) vertices.next()).id(); + this.vertexComponentMap.put(id, id); + } + } + + /** + * process for a vertex and its adjacentVertices + * @param sourceVertexId the source vertex + * @param adjacentVertices the adjacent vertices attached to source + * vertex + * @return the count of vertex that changed Component + */ + private long findAndSetMinComponent(Id sourceVertexId, + List<Id> adjacentVertices) { + if (!this.vertexComponentMap.containsKey(sourceVertexId)) { + return 0L; + } + Id min = this.findMinComponent(sourceVertexId, adjacentVertices); + return this.updateComponentIfNeeded(min, + sourceVertexId, + adjacentVertices); + } + + private Id findMinComponent(Id sourceVertexId, + List<Id> adjacentVertices) { + Id min = this.vertexComponentMap.get(sourceVertexId); + for (Id vertex : adjacentVertices) { + Id comp = this.vertexComponentMap.get(vertex); + if (comp != null && comp.compareTo(min) < 0) { + min = comp; + } + } + return min; + } + + private long updateComponentIfNeeded(Id min, + Id sourceVertexId, + List<Id> adjacentVertices) { + long changedCount = 0; + Id comp = this.vertexComponentMap.get(sourceVertexId); + if (comp.compareTo(min) > 0) { + this.vertexComponentMap.put(sourceVertexId, min); + changedCount++; + } + for (Id vertex : adjacentVertices) { + comp = this.vertexComponentMap.get(vertex); + if (comp != null && comp.compareTo(min) > 0) { + this.vertexComponentMap.put(vertex, min); + changedCount++; + } + } + return changedCount; + } + + /** + * @return the count of components + */ + private int writeBackValue() { + Map<Id, Integer> componentIndexMap = new HashMap<>(); + int index = 0; + for (Map.Entry<Id, Id> entry : this.vertexComponentMap.entrySet()) { + Id comp = entry.getValue(); + Integer componentIndex = componentIndexMap.get(comp); + if (componentIndex == null) { + componentIndex = index; + componentIndexMap.put(comp, componentIndex); + index++; + } + Vertex vertex = this.vertex(entry.getKey()); + if (vertex != null) { + vertex.property(C_LABEL, String.valueOf(componentIndex)); + this.commitIfNeeded(); + } + } + this.graph().tx().commit(); + return index; + } + } +}
