This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch olap-algo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit fa306b4b9bff15630ac793cbfc7c9af40acc2dd2 Author: Jermy Li <[email protected]> AuthorDate: Thu Apr 9 11:22:56 2020 +0800 implement 8 olap algorithms (#4) * add olap algo api * improve source filter * fix louvain shaking with limit degree * catch exception for lpa and louvain * add 3 params for lpa: label,source_label,percision * improve louvain node store * remove vertices from class Community * move showCommunity to AbstractCommAlgorithm * add some parameters to AbstractAlgorithm * improve louvain log * improve clearPass and communities check * split louvain cache * fix degreeCentrality bug: degree is always < 500 Change-Id: I2341b981dab44f43ac50ae0f8fa5e51b7acc1b5a --- .../com/baidu/hugegraph/api/job/AlgorithmAPI.java | 84 +++ .../java/com/baidu/hugegraph/job/AlgorithmJob.java | 71 ++ .../hugegraph/job/algorithm/AbstractAlgorithm.java | 516 +++++++++++++++ .../baidu/hugegraph/job/algorithm/Algorithm.java | 35 + .../hugegraph/job/algorithm/AlgorithmPool.java | 71 ++ .../job/algorithm/CountEdgeAlgorithm.java | 79 +++ .../job/algorithm/CountVertexAlgorithm.java | 79 +++ .../job/algorithm/cent/AbstractCentAlgorithm.java | 113 ++++ .../cent/BetweenessCentralityAlgorithm.java | 101 +++ .../cent/ClosenessCentralityAlgorithm.java | 111 ++++ .../algorithm/cent/DegreeCentralityAlgorithm.java | 140 ++++ .../cent/EigenvectorCentralityAlgorithm.java | 100 +++ .../job/algorithm/comm/AbstractCommAlgorithm.java | 78 +++ .../algorithm/comm/ClusterCoeffcientAlgorithm.java | 70 ++ .../job/algorithm/comm/LouvainAlgorithm.java | 83 +++ .../job/algorithm/comm/LouvainTraverser.java | 715 +++++++++++++++++++++ .../hugegraph/job/algorithm/comm/LpaAlgorithm.java | 263 ++++++++ .../job/algorithm/comm/TriangleCountAlgorithm.java | 153 +++++ 18 files changed, 2862 insertions(+) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java new file mode 100644 index 000000000..c965e02a5 --- /dev/null +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.api.job; + +import java.util.Map; + +import javax.inject.Singleton; +import javax.ws.rs.Consumes; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.api.API; +import com.baidu.hugegraph.api.filter.StatusFilter.Status; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.core.GraphManager; +import com.baidu.hugegraph.job.AlgorithmJob; +import com.baidu.hugegraph.job.JobBuilder; +import com.baidu.hugegraph.server.RestServer; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; +import com.baidu.hugegraph.util.Log; +import com.codahale.metrics.annotation.Timed; +import com.google.common.collect.ImmutableMap; + +@Path("graphs/{graph}/jobs/algorithm") +@Singleton +public class AlgorithmAPI extends API { + + private static final Logger LOG = Log.logger(RestServer.class); + + @POST + @Timed + @Path("/{name}") + @Status(Status.CREATED) + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON_WITH_CHARSET) + public Map<String, Id> post(@Context GraphManager manager, + @PathParam("graph") String graph, + @PathParam("name") String algorithm, + Map<String, Object> parameters) { + LOG.debug("Graph [{}] schedule algorithm job: {}", graph, parameters); + E.checkArgument(algorithm != null && !algorithm.isEmpty(), + "The algorithm name can't be empty"); + if (parameters == null) { + parameters = ImmutableMap.of(); + } + if (!AlgorithmJob.check(algorithm, parameters)) { + throw new NotFoundException("Not found algorithm: " + algorithm); + } + + HugeGraph g = graph(manager, graph); + Map<String, Object> input = ImmutableMap.of("algorithm", algorithm, + "parameters", parameters); + JobBuilder<Object> builder = JobBuilder.of(g); + builder.name("algorithm:" + algorithm) + .input(JsonUtil.toJson(input)) + .job(new AlgorithmJob()); + return ImmutableMap.of("task_id", builder.schedule().id()); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java new file mode 100644 index 000000000..7e752ac42 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job; + +import java.util.Map; + +import com.baidu.hugegraph.job.algorithm.Algorithm; +import com.baidu.hugegraph.job.algorithm.AlgorithmPool; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; + +public class AlgorithmJob extends Job<Object> { + + public static final String TASK_TYPE = "algorithm"; + + public static boolean check(String name, Map<String, Object> parameters) { + Algorithm algorithm = AlgorithmPool.instance().find(name); + if (algorithm == null) { + return false; + } + algorithm.checkParameters(parameters); + return true; + } + + @Override + public String type() { + return TASK_TYPE; + } + + @Override + public Object execute() throws Exception { + String input = this.task().input(); + E.checkArgumentNotNull(input, "The input can't be null"); + @SuppressWarnings("unchecked") + Map<String, Object> map = JsonUtil.fromJson(input, Map.class); + + Object value = map.get("algorithm"); + E.checkArgument(value instanceof String, + "Invalid algorithm name '%s'", value); + String name = (String) value; + + value = map.get("parameters"); + E.checkArgument(value instanceof Map, + "Invalid algorithm parameters '%s'", value); + @SuppressWarnings("unchecked") + Map<String, Object> parameters = (Map<String, Object>) value; + + AlgorithmPool pool = AlgorithmPool.instance(); + Algorithm algorithm = pool.find(name); + E.checkArgument(algorithm != null, + "There is no algorithm named '%s'", name); + return algorithm.call(this, parameters); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java new file mode 100644 index 000000000..660ef9f8f --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -0,0 +1,516 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm; + + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Property; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.query.ConditionQuery; +import com.baidu.hugegraph.backend.query.Query; +import com.baidu.hugegraph.iterator.FilterIterator; +import com.baidu.hugegraph.iterator.FlatMapperIterator; +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.testutil.Whitebox; +import com.baidu.hugegraph.traversal.algorithm.HugeTraverser; +import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.type.define.HugeKeys; +import com.baidu.hugegraph.util.Bytes; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; + +import jersey.repackaged.com.google.common.base.Objects; + +@SuppressWarnings("deprecation") // StringEscapeUtils +public abstract class AbstractAlgorithm implements Algorithm { + + public static final long MAX_RESULT_SIZE = 100L * Bytes.MB; + public static final long MAX_QUERY_LIMIT = 10000000L; // about 10GB + public static final int BATCH = 500; + + public static final String CATEGORY_AGGR = "aggregate"; + public static final String CATEGORY_PATH = "path"; + public static final String CATEGORY_RANK = "rank"; + public static final String CATEGORY_SIMI = "similarity"; + public static final String CATEGORY_COMM = "community"; + public static final String CATEGORY_CENT = "centrality"; + + public static final String KEY_DIRECTION = "direction"; + public static final String KEY_LABEL = "label"; + public static final String KEY_DEPTH = "depth"; + public static final String KEY_DEGREE = "degree"; + public static final String KEY_SAMPLE = "sample"; + public static final String KEY_SOURCE_SAMPLE = "source_sample"; + public static final String KEY_SOURCE_LABEL = "source_label"; + public static final String KEY_SOURCE_CLABEL = "source_clabel"; + public static final String KEY_TOP = "top"; + public static final String KEY_TIMES = "times"; + public static final String KEY_STABLE_TIMES = "stable_times"; + public static final String KEY_PRECISION = "precision"; + public static final String KEY_SHOW_COMM = "show_community"; + public static final String KEY_CLEAR = "clear"; + public static final String KEY_CAPACITY = "capacity"; + public static final String KEY_LIMIT = "limit"; + + public static final long DEFAULT_CAPACITY = 10000000L; + public static final long DEFAULT_LIMIT = 100L; + public static final long DEFAULT_DEGREE = 100L; + public static final long DEFAULT_SAMPLE = 1L; + public static final long DEFAULT_TIMES = 20L; + public static final long DEFAULT_STABLE_TIMES= 3L; + public static final double DEFAULT_PRECISION = 1.0 / 1000; + + @Override + public void checkParameters(Map<String, Object> parameters) { + E.checkArgument(parameters.isEmpty(), + "Unnecessary parameters: %s", parameters); + } + + protected static int depth(Map<String, Object> parameters) { + int depth = parameterInt(parameters, KEY_DEPTH); + E.checkArgument(depth > 0, + "The value of %s must be > 0, but got %s", + KEY_DEPTH, depth); + return depth; + } + + protected static String edgeLabel(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_LABEL)) { + return null; + } + return parameterString(parameters, KEY_LABEL); + } + + protected static Directions direction(Map<String, Object> parameters) { + Object direction = parameter(parameters, KEY_DIRECTION); + return parseDirection(direction); + } + + protected static long top(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_TOP)) { + return 0L; + } + long top = parameterLong(parameters, KEY_TOP); + E.checkArgument(top >= 0L, + "The value of %s must be >= 0, but got %s", + KEY_TOP, top); + return top; + } + + protected static long degree(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_DEGREE)) { + return DEFAULT_DEGREE; + } + long degree = parameterLong(parameters, KEY_DEGREE); + HugeTraverser.checkDegree(degree); + return degree; + } + + protected static long capacity(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_CAPACITY)) { + return DEFAULT_CAPACITY; + } + long capacity = parameterLong(parameters, KEY_CAPACITY); + HugeTraverser.checkCapacity(capacity); + return capacity; + } + + protected static long limit(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_LIMIT)) { + return DEFAULT_LIMIT; + } + long limit = parameterLong(parameters, KEY_LIMIT); + HugeTraverser.checkLimit(limit); + return limit; + } + + protected static long sample(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_SAMPLE)) { + return DEFAULT_SAMPLE; + } + long sample = parameterLong(parameters, KEY_SAMPLE); + HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SAMPLE); + return sample; + } + + protected static long sourceSample(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_SOURCE_SAMPLE)) { + return HugeTraverser.NO_LIMIT; + } + long sample = parameterLong(parameters, KEY_SOURCE_SAMPLE); + HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SOURCE_SAMPLE); + return sample; + } + + protected static String sourceLabel(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_SOURCE_LABEL)) { + return null; + } + return parameterString(parameters, KEY_SOURCE_LABEL); + } + + protected static String sourceCLabel(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_SOURCE_CLABEL)) { + return null; + } + return parameterString(parameters, KEY_SOURCE_CLABEL); + } + + public static Object parameter(Map<String, Object> parameters, String key) { + Object value = parameters.get(key); + E.checkArgument(value != null, + "Expect '%s' in parameters: %s", + key, parameters); + return value; + } + + public static String parameterString(Map<String, Object> parameters, + String key) { + Object value = parameter(parameters, key); + E.checkArgument(value instanceof String, + "Expect string value for parameter '%s': '%s'", + key, value); + return (String) value; + } + + public static int parameterInt(Map<String, Object> parameters, + String key) { + Object value = parameter(parameters, key); + E.checkArgument(value instanceof Number, + "Expect int value for parameter '%s': '%s'", + key, value); + return ((Number) value).intValue(); + } + + public static long parameterLong(Map<String, Object> parameters, + String key) { + Object value = parameter(parameters, key); + E.checkArgument(value instanceof Number, + "Expect long value for parameter '%s': '%s'", + key, value); + return ((Number) value).longValue(); + } + + public static double parameterDouble(Map<String, Object> parameters, + String key) { + Object value = parameter(parameters, key); + E.checkArgument(value instanceof Number, + "Expect double value for parameter '%s': '%s'", + key, value); + return ((Number) value).doubleValue(); + } + + public static boolean parameterBoolean(Map<String, Object> parameters, + String key) { + Object value = parameter(parameters, key); + E.checkArgument(value instanceof Boolean, + "Expect boolean value for parameter '%s': '%s'", + key, value); + return ((Boolean) value); + } + + public static Directions parseDirection(Object direction) { + if (direction.equals(Directions.BOTH.toString())) { + return Directions.BOTH; + } else if (direction.equals(Directions.OUT.toString())) { + return Directions.OUT; + } else if (direction.equals(Directions.IN.toString())) { + return Directions.IN; + } else { + throw new IllegalArgumentException(String.format( + "The value of direction must be in [OUT, IN, BOTH], " + + "but got '%s'", direction)); + } + } + + public static class AlgoTraverser extends HugeTraverser { + + private final Job<Object> job; + protected long progress; + + public AlgoTraverser(Job<Object> job) { + super(job.graph()); + this.job = job; + } + + public void updateProgress(long progress) { + this.job.updateProgress((int) progress); + } + + protected Iterator<Vertex> vertices() { + return this.vertices(Query.NO_LIMIT); + } + + protected Iterator<Vertex> vertices(long limit) { + Query query = new Query(HugeType.VERTEX); + query.capacity(Query.NO_CAPACITY); + query.limit(limit); + return this.graph().vertices(query); + } + + protected Iterator<Vertex> vertices(Object label, String key, + Object value, long limit) { + Iterator<Vertex> vertices = this.vertices(label, limit); + if (key != null) { + vertices = filter(vertices, key, value); + } + return vertices; + } + + protected Iterator<Vertex> vertices(Object label, long limit) { + if (label == null) { + return this.vertices(limit); + } + ConditionQuery query = new ConditionQuery(HugeType.VERTEX); + query.capacity(Query.NO_CAPACITY); + query.limit(limit); + if (label != null) { + query.eq(HugeKeys.LABEL, this.getVertexLabelId(label)); + } + return this.graph().vertices(query); + } + + protected Iterator<Vertex> vertices(Iterator<Object> ids) { + return new FlatMapperIterator<>(ids, id -> { + return this.graph().vertices(id); + }); + } + + protected Iterator<Vertex> filter(Iterator<Vertex> vertices, + String key, Object value) { + return new FilterIterator<>(vertices, vertex -> { + boolean matched = match(vertex, key, value); + if (!matched) { + this.updateProgress(++this.progress); + } + return matched; + }); + } + + protected static boolean match(Element elem, String key, Object value) { + Property<Object> p = elem.property(key); + return p.isPresent() && Objects.equal(p.value(), value); + } + + protected Iterator<Edge> edges(Directions dir) { + HugeType type = dir == null ? HugeType.EDGE : dir.type(); + Query query = new Query(type); + query.capacity(Query.NO_CAPACITY); + query.limit(Query.NO_LIMIT); + return this.graph().edges(query); + } + + protected void drop(GraphTraversal<?, ? extends Element> traversal) { + this.execute(traversal, () -> { + while (traversal.hasNext()) { + this.updateProgress(++this.progress); + traversal.next().remove(); + this.commitIfNeeded(); + } + return null; + }); + this.graph().tx().commit(); + } + + protected <V> V execute(GraphTraversal<?, ?> traversal, + Callable<V> callback) { + long capacity = Query.defaultCapacity(MAX_QUERY_LIMIT); + try { + return callback.call(); + } catch (Exception e) { + throw new HugeException("Failed to execute algorithm", e); + } finally { + Query.defaultCapacity(capacity); + try { + traversal.close(); + } catch (Exception e) { + throw new HugeException("Can't close traversal", e); + } + } + } + + protected void commitIfNeeded() { + // commit if needed + Transaction tx = this.graph().tx(); + Whitebox.invoke(tx.getClass(), "commitIfGtSize", tx, BATCH); + } + } + + public static final class TopMap { + + private final long topN; + private Map<Id, MutableLong> tops; + + public TopMap(long topN) { + this.topN = topN; + this.tops = new HashMap<>(); + } + + public int size() { + return this.tops.size(); + } + + public void put(Id key, long value) { + this.put(key, Long.valueOf(value)); + } + + public void put(Id key, Long value) { + this.tops.put(key, new MutableLong(value)); + // keep 2x buffer + if (this.tops.size() > this.topN * 2) { + this.shrinkIfNeeded(this.topN); + } + } + + public Set<Map.Entry<Id, MutableLong>> entrySet() { + this.shrinkIfNeeded(this.topN); + return this.tops.entrySet(); + } + + private void shrinkIfNeeded(long limit) { + if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) { + this.tops = HugeTraverser.topN(this.tops, true, limit); + } + } + } + + public static final class JsonMap { + + private final StringBuilder json; + + public JsonMap() { + this(4 * (int) Bytes.KB); + } + + public JsonMap(int initCapaticy) { + this.json = new StringBuilder(initCapaticy); + } + + public void startObject() { + this.json.append('{'); + } + + public void endObject() { + this.deleteLastComma(); + this.json.append('}'); + } + + public void startList() { + this.json.append('['); + } + + public void endList() { + this.deleteLastComma(); + this.json.append(']'); + } + + public void deleteLastComma() { + int last = this.json.length() - 1; + if (last >= 0 && this.json.charAt(last) == ',') { + this.json.deleteCharAt(last); + } + } + + public void appendKey(String key) { + this.appendString(key).append(':'); + } + + public void append(long value) { + this.json.append(value).append(','); + this.checkSizeLimit(); + } + + public void append(String value) { + this.appendString(value).append(','); + this.checkSizeLimit(); + } + + public void append(Object key, long value) { + this.append(key.toString(), value); + } + + public void append(String key, long value) { + this.appendString(key).append(':'); + this.json.append(value).append(','); + this.checkSizeLimit(); + } + + public void append(Object key, Number value) { + this.append(key.toString(), value); + } + + public void append(String key, Number value) { + this.appendString(key).append(':'); + this.json.append(value).append(','); + this.checkSizeLimit(); + } + + public void append(String key, String value) { + this.appendString(key).append(':'); + this.appendString(value).append(','); + this.checkSizeLimit(); + } + + public void appendRaw(String key, String rawJson) { + this.appendString(key).append(':'); + this.json.append(rawJson).append(','); + this.checkSizeLimit(); + } + + public void append(Set<Entry<Id, MutableLong>> kvs) { + for (Map.Entry<Id, MutableLong> top : kvs) { + this.append(top.getKey(), top.getValue()); + } + } + + private StringBuilder appendString(String str) { + if (str.indexOf('"') >= 0) { + str = StringEscapeUtils.escapeJson(str); + } + return this.json.append('"').append(str).append('"'); + } + + public void checkSizeLimit() { + E.checkArgument(this.json.length() < MAX_RESULT_SIZE, + "The result size exceeds limit %s", + MAX_RESULT_SIZE); + } + + public Object asJson() { + return JsonUtil.asJson(this.json.toString()); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java new file mode 100644 index 000000000..6ad200157 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm; + +import java.util.Map; + +import com.baidu.hugegraph.job.Job; + +public interface Algorithm { + + public String name(); + + public String category(); + + public Object call(Job<Object> job, Map<String, Object> parameters); + + public void checkParameters(Map<String, Object> parameters); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java new file mode 100644 index 000000000..98f7c89dc --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.baidu.hugegraph.job.algorithm.cent.BetweenessCentralityAlgorithm; +import com.baidu.hugegraph.job.algorithm.cent.ClosenessCentralityAlgorithm; +import com.baidu.hugegraph.job.algorithm.cent.DegreeCentralityAlgorithm; +import com.baidu.hugegraph.job.algorithm.cent.EigenvectorCentralityAlgorithm; +import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm; +import com.baidu.hugegraph.job.algorithm.comm.LouvainAlgorithm; +import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm; +import com.baidu.hugegraph.job.algorithm.comm.TriangleCountAlgorithm; + +public class AlgorithmPool { + + private static final AlgorithmPool INSTANCE = new AlgorithmPool(); + + static { + INSTANCE.register(new CountVertexAlgorithm()); + INSTANCE.register(new CountEdgeAlgorithm()); + + INSTANCE.register(new DegreeCentralityAlgorithm()); + INSTANCE.register(new BetweenessCentralityAlgorithm()); + INSTANCE.register(new ClosenessCentralityAlgorithm()); + INSTANCE.register(new EigenvectorCentralityAlgorithm()); + + INSTANCE.register(new TriangleCountAlgorithm()); + INSTANCE.register(new ClusterCoeffcientAlgorithm()); + INSTANCE.register(new LpaAlgorithm()); + INSTANCE.register(new LouvainAlgorithm()); + } + + private final Map<String, Algorithm> algorithms; + + public AlgorithmPool() { + this.algorithms = new ConcurrentHashMap<>(); + } + + public Algorithm register(Algorithm algo) { + assert !this.algorithms.containsKey(algo.name()); + return this.algorithms.put(algo.name(), algo); + } + + public Algorithm find(String name) { + return this.algorithms.get(name); + } + + public static AlgorithmPool instance() { + return INSTANCE; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java new file mode 100644 index 000000000..9fb122348 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java @@ -0,0 +1,79 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.tinkerpop.gremlin.structure.Edge; + +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.util.JsonUtil; + +public class CountEdgeAlgorithm extends AbstractAlgorithm { + + @Override + public String name() { + return "count_edge"; + } + + @Override + public String category() { + return CATEGORY_AGGR; + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.count(); + } + + private static class Traverser extends AlgoTraverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object count() { + Iterator<Edge> edges = this.edges(null); + + Map<String, MutableLong> counts = new HashMap<>(); + long total = 0L; + + while (edges.hasNext()) { + Edge edge = edges.next(); + String label = edge.label(); + MutableLong count = counts.get(label); + if (count != null) { + count.increment(); + } else { + counts.put(label, new MutableLong(1L)); + } + total++; + this.updateProgress(total); + } + counts.put("*", new MutableLong(total)); + + return JsonUtil.asJson(counts); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java new file mode 100644 index 000000000..582e0bb69 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java @@ -0,0 +1,79 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.util.JsonUtil; + +public class CountVertexAlgorithm extends AbstractAlgorithm { + + @Override + public String name() { + return "count_vertex"; + } + + @Override + public String category() { + return CATEGORY_AGGR; + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.count(); + } + + private static class Traverser extends AlgoTraverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object count() { + Iterator<Vertex> vertices = this.vertices(); + + Map<String, MutableLong> counts = new HashMap<>(); + long total = 0L; + + while (vertices.hasNext()) { + Vertex vertex = vertices.next(); + String label = vertex.label(); + MutableLong count = counts.get(label); + if (count != null) { + count.increment(); + } else { + counts.put(label, new MutableLong(1L)); + } + total++; + this.updateProgress(total); + } + counts.put("*", new MutableLong(total)); + + return JsonUtil.asJson(counts); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java new file mode 100644 index 000000000..14841043a --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java @@ -0,0 +1,113 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.cent; + +import java.util.Map; + +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; +import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm; + +public abstract class AbstractCentAlgorithm extends AbstractAlgorithm { + + protected static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL; + + @Override + public String category() { + return CATEGORY_CENT; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + depth(parameters); + degree(parameters); + sample(parameters); + sourceSample(parameters); + sourceLabel(parameters); + sourceCLabel(parameters); + top(parameters); + } + + public static class Traverser extends AlgoTraverser { + + public Traverser(Job<Object> job) { + super(job); + } + + protected GraphTraversal<Vertex, Vertex> constructSource( + String sourceLabel, + long sourceSample, + String sourceCLabel) { + GraphTraversal<Vertex, Vertex> t = this.graph().traversal() + .withSack(1f).V(); + + if (sourceLabel != null) { + t = t.hasLabel(sourceLabel); + } + + t = t.filter(it -> { + this.updateProgress(++this.progress); + return sourceCLabel == null ? true : + match(it.get(), C_LABEL, sourceCLabel); + }); + + if (sourceSample > 0L) { + t = t.sample((int) sourceSample); + } + + return t; + } + + protected GraphTraversal<Vertex, Vertex> constructPath( + GraphTraversal<Vertex, Vertex> t, long degree, + long sample, String sourceLabel, String sourceCLabel) { + GraphTraversal<?, Vertex> unit = constructPathUnit(degree, sample, + sourceLabel, + sourceCLabel); + t = t.as("v").repeat(__.local(unit).simplePath().as("v")); + + return t; + } + + protected GraphTraversal<Vertex, Vertex> constructPathUnit( + long degree, long sample, + String sourceLabel, + String sourceCLabel) { + GraphTraversal<Vertex, Vertex> unit = __.both(); + if (sourceLabel != null) { + unit = unit.hasLabel(sourceLabel); + } + if (sourceCLabel != null) { + unit = unit.has(C_LABEL, sourceCLabel); + } + if (degree != NO_LIMIT) { + unit = unit.limit(degree); + } + if (sample > 0L) { + unit = unit.sample((int) sample); + } + return unit; + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java new file mode 100644 index 000000000..ae1b8bb74 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java @@ -0,0 +1,101 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.cent; + +import java.util.Map; + +import org.apache.tinkerpop.gremlin.process.traversal.Order; +import org.apache.tinkerpop.gremlin.process.traversal.P; +import org.apache.tinkerpop.gremlin.process.traversal.Pop; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.structure.Column; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.job.Job; + +public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm { + + @Override + public String name() { + return "betweeness_centrality"; + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.betweenessCentrality(depth(parameters), + degree(parameters), + sample(parameters), + sourceLabel(parameters), + sourceSample(parameters), + sourceCLabel(parameters), + top(parameters)); + } + + private static class Traverser extends AbstractCentAlgorithm.Traverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object betweenessCentrality(int depth, + long degree, + long sample, + String sourceLabel, + long sourceSample, + String sourceCLabel, + long topN) { + assert depth > 0; + assert degree > 0L; + assert topN >= 0L; + + GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel, + sourceSample, + sourceCLabel); + t = constructPath(t, degree, sample, sourceLabel, sourceCLabel); + t = t.emit().until(__.loops().is(P.gte(depth))); + + @SuppressWarnings({ "unchecked", "deprecation" }) + GraphTraversal<Vertex, Vertex> tf = t.filter( + __.project("x","y","z") + .by(__.select(Pop.first, "v").id()) + .by(__.select(Pop.last, "v").id()) + .by(__.select(Pop.all, "v").count(Scope.local)) + .as("triple") + .coalesce(__.select("x","y").as("a") + .select("triples").unfold().as("t") + .select("x","y").where(P.eq("a")).select("t"), + __.store("triples")) + .select("z").as("length") + .select("triple").select("z").where(P.eq("length"))); + + GraphTraversal<Vertex, ?> tg = tf.select(Pop.all, "v") + .unfold().id() + .groupCount().order(Scope.local) + .by(Column.values, Order.desc); + GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg : + tg.limit(Scope.local, topN); + + return this.execute(tLimit, () -> tLimit.next()); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java new file mode 100644 index 000000000..d890db808 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java @@ -0,0 +1,111 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.cent; + +import java.util.Map; + +import org.apache.tinkerpop.gremlin.process.traversal.Operator; +import org.apache.tinkerpop.gremlin.process.traversal.Order; +import org.apache.tinkerpop.gremlin.process.traversal.P; +import org.apache.tinkerpop.gremlin.process.traversal.Pop; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.structure.Column; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.job.Job; + +public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm { + + public static final long DEFAULT_DEGREE = 100L; + public static final long DEFAULT_SAMPLE = 1L; + + @Override + public String name() { + return "closeness_centrality"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + depth(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.closenessCentrality(depth(parameters), + degree(parameters), + sample(parameters), + sourceLabel(parameters), + sourceSample(parameters), + sourceCLabel(parameters), + top(parameters)); + } + + private static class Traverser extends AbstractCentAlgorithm.Traverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object closenessCentrality(int depth, + long degree, + long sample, + String sourceLabel, + long sourceSample, + String sourceCLabel, + long topN) { + assert depth > 0; + assert degree > 0L; + assert topN >= 0L; + + GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel, + sourceSample, + sourceCLabel); + t = constructPath(t, degree, sample, sourceLabel, sourceCLabel); + t = t.emit().until(__.loops().is(P.gte(depth))); + + @SuppressWarnings({ "unchecked", "deprecation" }) + GraphTraversal<Vertex, Vertex> tf = t.filter( + __.project("x","y","z") + .by(__.select(Pop.first, "v").id()) + .by(__.select(Pop.last, "v").id()) + .by(__.select(Pop.all, "v").count(Scope.local)) + .as("triple") + .coalesce(__.select("x","y").as("a") + .select("triples").unfold().as("t") + .select("x","y").where(P.eq("a")).select("t"), + __.store("triples")) + .select("z").as("length") + .select("triple").select("z").where(P.eq("length"))); + + GraphTraversal<Vertex, ?> tg; + tg = tf.group().by(__.select(Pop.first, "v").id()) + .by(__.select(Pop.all, "v").count(Scope.local) + .sack(Operator.div).sack().sum()) + .order(Scope.local).by(Column.values, Order.desc); + GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg : + tg.limit(Scope.local, topN); + + return this.execute(tLimit, () -> tLimit.next()); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java new file mode 100644 index 000000000..81bd33672 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java @@ -0,0 +1,140 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.cent; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.structure.HugeEdge; +import com.baidu.hugegraph.type.define.Directions; + +public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm { + + @Override + public String name() { + return "degree_centrality"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + direction(parameters); + top(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.degreeCentrality(direction(parameters), + top(parameters)); + } + + private static class Traverser extends AlgoTraverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object degreeCentrality(Directions direction, long topN) { + if (direction == null || direction == Directions.BOTH) { + return degreeCentrality(topN); + } + assert direction == Directions.OUT || direction == Directions.IN; + assert topN >= 0L; + + Iterator<Edge> edges = this.edges(direction); + + JsonMap degrees = new JsonMap(); + TopMap tops = new TopMap(topN); + Id vertex = null; + long degree = 0L; + long total = 0L; + + degrees.startObject(); + while (edges.hasNext()) { + HugeEdge edge = (HugeEdge) edges.next(); + this.updateProgress(++total); + + Id source = edge.ownerVertex().id(); + if (source.equals(vertex)) { + degree++; + continue; + } + if (vertex != null) { + if (topN <= 0L) { + degrees.append(vertex, degree); + } else { + tops.put(vertex, degree); + } + } + vertex = source; + degree = 1L; + } + + if (vertex != null) { + if (topN <= 0L) { + degrees.append(vertex, degree); + } else { + tops.put(vertex, degree); + degrees.append(tops.entrySet()); + } + } + + degrees.endObject(); + + return degrees.asJson(); + } + + protected Object degreeCentrality(long topN) { + assert topN >= 0L; + long total = 0L; + JsonMap degrees = new JsonMap(); + TopMap tops = new TopMap(topN); + + GraphTraversalSource traversal = this.graph().traversal(); + Iterator<Vertex> vertices = this.vertices(); + + degrees.startObject(); + while (vertices.hasNext()) { + Vertex source = vertices.next(); + this.updateProgress(++total); + + Long degree = traversal.V(source).bothE().count().next(); + if (topN <= 0L) { + degrees.append(source.id(), degree); + } else { + tops.put((Id) source.id(), degree); + } + } + + if (tops.size() > 0) { + degrees.append(tops.entrySet()); + } + degrees.endObject(); + + return degrees.asJson(); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java new file mode 100644 index 000000000..d87fc7931 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java @@ -0,0 +1,100 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.cent; + +import java.util.Map; + +import org.apache.tinkerpop.gremlin.process.traversal.Order; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.structure.Column; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.job.Job; + +public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm { + + public static final long DEFAULT_DEGREE = 100L; + public static final long DEFAULT_SAMPLE = 1L; + + @Override + public String name() { + return "eigenvector_centrality"; + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.eigenvectorCentrality(depth(parameters), + degree(parameters), + sample(parameters), + sourceLabel(parameters), + sourceSample(parameters), + sourceCLabel(parameters), + top(parameters)); + } + + private static class Traverser extends AbstractCentAlgorithm.Traverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object eigenvectorCentrality(int depth, + long degree, + long sample, + String sourceLabel, + long sourceSample, + String sourceCLabel, + long topN) { + assert depth > 0; + assert degree > 0L; + assert topN >= 0L; + + // TODO: support parameters: Directions dir, String label + /* + * g.V().repeat(groupCount('m').by(id) + * .local(both().limit(50).sample(1)) + * .simplePath()) + * .times(4).cap('m') + * .order(local).by(values, desc) + * .limit(local, 100) + */ + + GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel, + sourceSample, + sourceCLabel); + GraphTraversal<?, Vertex> unit = constructPathUnit(degree, sample, + sourceLabel, + sourceCLabel); + t = t.repeat(__.groupCount("m").by(T.id) + .local(unit).simplePath()).times(depth); + + GraphTraversal<Vertex, Object> tCap; + tCap = t.cap("m").order(Scope.local).by(Column.values, Order.desc); + GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tCap : + tCap.limit(Scope.local, topN); + + return this.execute(tLimit, () -> tLimit.next()); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java new file mode 100644 index 000000000..74b884a06 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java @@ -0,0 +1,78 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.Map; + +import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; +import com.baidu.hugegraph.traversal.algorithm.HugeTraverser; +import com.baidu.hugegraph.util.E; + +public abstract class AbstractCommAlgorithm extends AbstractAlgorithm { + + private static final int MAX_TIMES = 2048; + + @Override + public String category() { + return CATEGORY_COMM; + } + + protected static int times(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_TIMES)) { + return (int) DEFAULT_TIMES; + } + int times = parameterInt(parameters, KEY_TIMES); + HugeTraverser.checkPositiveOrNoLimit(times, KEY_TIMES); + E.checkArgument(times <= MAX_TIMES, + "The maximum number of iterations is %s, but got %s", + MAX_TIMES, times); + return times; + } + + protected static int stableTimes(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_STABLE_TIMES)) { + return (int) DEFAULT_STABLE_TIMES; + } + int times = parameterInt(parameters, KEY_STABLE_TIMES); + HugeTraverser.checkPositiveOrNoLimit(times, KEY_STABLE_TIMES); + E.checkArgument(times <= MAX_TIMES, + "The maximum number of stable iterations is %s, " + + "but got %s", MAX_TIMES, times); + return times; + } + + protected static double precision(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_PRECISION)) { + return DEFAULT_PRECISION; + } + double precision = parameterDouble(parameters, KEY_PRECISION); + E.checkArgument(0d < precision && precision < 1d, + "The %s parameter must be in range(0,1), but got: %s", + KEY_PRECISION, precision); + return precision; + } + + protected static String showCommunity(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_SHOW_COMM)) { + return null; + } + return parameterString(parameters, KEY_SHOW_COMM); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java new file mode 100644 index 000000000..cc893fc1f --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java @@ -0,0 +1,70 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.Map; + +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.util.InsertionOrderUtil; + +public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm { + + @Override + public String name() { + return "cluster_coeffcient"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + direction(parameters); + degree(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.clusterCoeffcient(direction(parameters), + degree(parameters)); + } + + private static class Traverser extends TriangleCountAlgorithm.Traverser { + + public Traverser(Job<Object> job) { + super(job); + } + + public Object clusterCoeffcient(Directions direction, long degree) { + Map<String, Long> results = this.triangles(direction, degree); + results = InsertionOrderUtil.newMap(results); + + long triangles = results.remove(KEY_TRIANGLES); + long triads = results.remove(KEY_TRIADS); + assert triangles <= triads; + double coeffcient = triads == 0L ? 0d : 1d * triangles / triads; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + Map<String, Double> converted = (Map) results; + converted.put("cluster_coeffcient", coeffcient); + + return results; + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java new file mode 100644 index 000000000..3f6de63e8 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java @@ -0,0 +1,83 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.Map; + +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.util.E; + +public class LouvainAlgorithm extends AbstractCommAlgorithm { + + @Override + public String name() { + return "louvain"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + times(parameters); + stableTimes(parameters); + precision(parameters); + degree(parameters); + sourceLabel(parameters); + sourceCLabel(parameters); + showCommunity(parameters); + clearPass(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + String label = sourceLabel(parameters); + String clabel = sourceCLabel(parameters); + long degree = degree(parameters); + + LouvainTraverser traverser = new LouvainTraverser(job, degree, + label, clabel); + Long clearPass = clearPass(parameters); + String showComm = showCommunity(parameters); + try { + if (clearPass != null) { + return traverser.clearPass(clearPass.intValue()); + } else if (showComm != null) { + return traverser.showCommunity(showComm); + } else { + return traverser.louvain(times(parameters), + stableTimes(parameters), + precision(parameters)); + } + } catch (Throwable e) { + job.graph().tx().rollback(); + throw e; + } + } + + protected static Long clearPass(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_CLEAR)) { + return null; + } + long pass = parameterLong(parameters, KEY_CLEAR); + // TODO: change to checkNonNegative() + E.checkArgument(pass >= 0 || pass == -1, + "The %s parameter must be >= 0 or == -1, but got %s", + KEY_CLEAR, pass); + return pass; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java new file mode 100644 index 000000000..0b3d674aa --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java @@ -0,0 +1,715 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.slf4j.Logger; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.id.IdGenerator; +import com.baidu.hugegraph.exception.ExistedException; +import com.baidu.hugegraph.iterator.ListIterator; +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; +import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser; +import com.baidu.hugegraph.schema.SchemaLabel; +import com.baidu.hugegraph.schema.SchemaManager; +import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.structure.HugeEdge; +import com.baidu.hugegraph.structure.HugeVertex; +import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.util.Log; +import com.google.common.collect.ImmutableMap; + +public class LouvainTraverser extends AlgoTraverser { + + public static final String C_PASS = "c_pass-"; + public static final String C_KIN = "c_kin"; + public static final String C_WEIGHT = "c_weight"; + public static final String C_MEMBERS = "c_members"; + + public static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL; + + private static final long LIMIT = AbstractAlgorithm.MAX_QUERY_LIMIT; + + private static final Logger LOG = Log.logger(LouvainTraverser.class); + + private final GraphTraversalSource g; + private final long m; + private final String sourceLabel; + private final String sourceCLabel; + private final long degree; + private final Cache cache; + + private String passLabel; + + public LouvainTraverser(Job<Object> job, long degree, + String sourceLabel, String sourceCLabel) { + super(job); + this.g = this.graph().traversal(); + this.m = this.g.E().count().next(); + this.sourceLabel = sourceLabel; + this.sourceCLabel = sourceCLabel; + this.degree = degree; + this.passLabel = ""; + + this.cache = new Cache(); + } + + @SuppressWarnings("unused") + private Id genId2(int pass, Id cid) { + // gen id for merge-community vertex + String id = cid.toString(); + if (pass == 0) { + // conncat pass with cid + id = pass + "~" + id; + } else { + // replace last pass with current pass + String lastPass = String.valueOf(pass - 1); + assert id.startsWith(lastPass); + id = id.substring(lastPass.length()); + id = pass + id; + } + return IdGenerator.of(id); + } + + private void defineSchemaOfPk() { + String label = this.labelOfPassN(0); + if (this.graph().existsVertexLabel(label) || + this.graph().existsEdgeLabel(label)) { + throw new IllegalArgumentException( + "Please clear historical results before proceeding"); + } + + SchemaManager schema = this.graph().schema(); + schema.propertyKey(C_KIN).asInt() + .ifNotExist().create(); + schema.propertyKey(C_MEMBERS).valueSet().asText() + .ifNotExist().create(); + schema.propertyKey(C_WEIGHT).asFloat() + .ifNotExist().create(); + } + + private void defineSchemaOfPassN(int pass) { + this.passLabel = labelOfPassN(pass); + + SchemaManager schema = this.graph().schema(); + try { + schema.vertexLabel(this.passLabel).useCustomizeStringId() + .properties(C_KIN, C_MEMBERS) + .nullableKeys(C_KIN, C_MEMBERS) + .create(); + schema.edgeLabel(this.passLabel) + .sourceLabel(this.passLabel) + .targetLabel(this.passLabel) + .properties(C_WEIGHT) + .create(); + } catch (ExistedException e) { + throw new IllegalArgumentException( + "Please clear historical results before proceeding", e); + } + } + + private List<String> cpassEdgeLabels() { + List<String> names = new ArrayList<>(); + for (SchemaLabel label : this.graph().schema().getEdgeLabels()) { + String name = label.name(); + if (name.startsWith(C_PASS)) { + names.add(name); + } + } + return names; + } + + private List<String> cpassVertexLabels() { + List<String> names = new ArrayList<>(); + for (SchemaLabel label : this.graph().schema().getVertexLabels()) { + String name = label.name(); + if (name.startsWith(C_PASS)) { + names.add(name); + } + } + return names; + } + + private String labelOfPassN(int n) { + return C_PASS + n; + } + + private float weightOfEdge(Edge e) { + if (e.label().startsWith(C_PASS)) { + assert e.property(C_WEIGHT).isPresent(); + return e.value(C_WEIGHT); + } else if (e.property(C_WEIGHT).isPresent()) { + return e.value(C_WEIGHT); + } + return 1f; + } + + private float weightOfEdges(List<Edge> edges) { + float weight = 0f; + for (Edge edge : edges) { + weight += weightOfEdge(edge); + } + return weight; + } + + private Vertex newCommunityNode(Id cid, int kin, List<String> members) { + assert !members.isEmpty() : members; + return this.graph().addVertex(T.label, this.passLabel, T.id, cid, + C_KIN, kin, C_MEMBERS, members); + } + + private Vertex makeCommunityNode(Id cid) { + VertexLabel vl = this.graph().vertexLabel(this.passLabel); + return new HugeVertex(this.graph(), cid, vl); + } + + private Edge newCommunityEdge(Vertex source, Vertex target, float weight) { + return source.addEdge(this.passLabel, target, C_WEIGHT, weight); + } + + private void insertNewCommunity(int pass, Id cid, int kin, + List<String> members, + Map<Id, MutableInt> cedges) { + // create backend vertex if it's the first time + Id vid = this.cache.genId(pass, cid); + Vertex node = this.newCommunityNode(vid, kin, members); + commitIfNeeded(); + // update backend vertex edges + for (Map.Entry<Id, MutableInt> e : cedges.entrySet()) { + float weight = e.getValue().floatValue(); + vid = this.cache.genId(pass, e.getKey()); + Vertex targetV = this.makeCommunityNode(vid); + this.newCommunityEdge(node, targetV, weight); + commitIfNeeded(); + } + LOG.debug("Add new comm: {} kin={} size={}", node, kin, members.size()); + } + + private boolean needSkipVertex(int pass, Vertex v) { + // skip the old intermediate data when first pass + String label = v.label(); + if (label.startsWith(C_PASS)) { + if (pass == 0) { + return true; + } + String lastPassLabel = labelOfPassN(pass - 1); + if (!label.equals(lastPassLabel)) { + return true; + } + } + // skip the vertex with unmatched clabel + if (this.sourceCLabel != null && + !match(v, C_LABEL, this.sourceCLabel)) { + return true; + } + return false; + } + + private Iterator<Vertex> sourceVertices(int pass) { + if (pass > 0) { + // all vertices of merged community + String lastPassLabel = labelOfPassN(pass - 1); + return this.vertices(lastPassLabel, LIMIT); + } else { + assert pass == 0; + // all vertices at the first time + return this.vertices(this.sourceLabel, LIMIT); + } + } + + private List<Edge> neighbors(Id vid) { + Iterator<Edge> nbs = this.edgesOfVertex(vid, Directions.BOTH, + (Id) null, this.degree); + @SuppressWarnings("resource") + ListIterator<Edge> list = new ListIterator<>(LIMIT, nbs); + return (List<Edge>) list.list(); + } + + private float weightOfVertex(Vertex v, List<Edge> edges) { + Float value = this.cache.vertexWeight((Id) v.id()); + if (value != null) { + return value; + } + if (edges == null) { + edges = neighbors((Id) v.id()); + } + float weight = weightOfEdges(edges); + this.cache.vertexWeight((Id) v.id(), weight); + return weight; + } + + private int kinOfVertex(Vertex v) { + if (v.label().startsWith(C_PASS) && v.property(C_KIN).isPresent()) { + return v.value(C_KIN); + } + return 0; + } + + private Id cidOfVertex(Vertex v) { + Id vid = (Id) v.id(); + Community c = this.cache.vertex2Community(vid); + return c != null ? c.cid : vid; + } + + // 1: wrap original vertex as community node + // 2: add original vertices to community node, + // and save as community vertex when merge() + // 3: wrap community vertex as community node, + // and repeat step 2 and step 3. + private Community wrapCommunity(Vertex otherV) { + Id vid = (Id) otherV.id(); + Community comm = this.cache.vertex2Community(vid); + if (comm != null) { + return comm; + } + + comm = new Community(vid); + comm.add(this, otherV, null); // will traverse the neighbors of otherV + this.cache.vertex2Community(vid, comm); + return comm; + } + + private Collection<Pair<Community, MutableInt>> nbCommunities( + int pass, + List<Edge> edges) { + // comms is a map of cid:[community,weight] + Map<Id, Pair<Community, MutableInt>> comms = new HashMap<>(); + for (Edge edge : edges) { + Vertex otherV = ((HugeEdge) edge).otherVertex(); + if (needSkipVertex(pass, otherV)) { + // skip the old intermediate data, or filter clabel + continue; + } + Community c = wrapCommunity(otherV); + if (!comms.containsKey(c.cid)) { + comms.put(c.cid, Pair.of(c, new MutableInt(0))); + } + // calc weight between source vertex and neighbor community + comms.get(c.cid).getRight().add(2 * weightOfEdge(edge)); + } + return comms.values(); + } + + private void moveCommunity(Vertex v, List<Edge> nbs, Community newC) { + Id vid = (Id) v.id(); + + // remove v from old community + Community oldC = this.cache.vertex2Community(vid); + if (oldC != null) { + oldC.remove(this, v, nbs); + } + + // add v to new community + newC.add(this, v, nbs); + LOG.debug("Move {} to comm: {}", v, newC); + + // update community of v + this.cache.vertex2Community(vid, newC); + } + + private double moveCommunities(int pass) { + Iterator<Vertex> vertices = this.sourceVertices(pass); + + // shuffle + //r = r.order().by(shuffle); + + long total = 0L; + long moved = 0L; + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + Vertex v = vertices.next(); + if (needSkipVertex(pass, v)) { + // skip the old intermediate data, or filter clabel + continue; + } + total++; + Id cid = cidOfVertex(v); + List<Edge> nbs = neighbors((Id) v.id()); + double ki = kinOfVertex(v) + weightOfVertex(v, nbs); + // update community of v if △Q changed + double maxDeltaQ = 0d; + Community bestComm = null; + // list all neighbor communities of v + for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) { + // △Q = (Ki_in - Ki * Etot / m) / 2m + Community otherC = nbc.getLeft(); + // weight between c and otherC + double kiin = nbc.getRight().floatValue(); + // weight of otherC + int tot = otherC.kin() + otherC.kout(); + if (cid.equals(otherC.cid)) { + tot -= ki; + assert tot >= 0; + // expect tot >= 0, but may be something wrong? + if (tot < 0) { + tot = 0; + } + } + double deltaQ = kiin - ki * tot / this.m; + if (deltaQ > maxDeltaQ) { + // TODO: cache otherC for neighbors the same community + maxDeltaQ = deltaQ; + bestComm = otherC; + } + } + if (maxDeltaQ > 0d && !cid.equals(bestComm.cid)) { + moved++; + // move v to the community of maxQ neighbor + moveCommunity(v, nbs, bestComm); + } + } + + // maybe always shocking when set degree limit + return total == 0L ? 0d : (double) moved / total; + } + + private void mergeCommunities(int pass) { + // merge each community as a vertex + Collection<Pair<Community, Set<Id>>> comms = this.cache.communities(); + this.cache.resetVertexWeight(); + for (Pair<Community, Set<Id>> pair : comms) { + Community c = pair.getKey(); + if (c.empty()) { + continue; + } + // update kin and edges between communities + int kin = c.kin(); + Set<Id> vertices = pair.getRight(); + assert !vertices.isEmpty(); + List<String> members = new ArrayList<>(vertices.size()); + Map<Id, MutableInt> cedges = new HashMap<>(vertices.size()); + for (Id v : vertices) { + members.add(v.toString()); + // collect edges between this community and other communities + List<Edge> neighbors = neighbors(v); + for (Edge edge : neighbors) { + Vertex otherV = ((HugeEdge) edge).otherVertex(); + if (vertices.contains(otherV.id())) { + // inner edges of this community, will be calc twice + // due to both e-in and e-out are in vertices, + kin += weightOfEdge(edge); + continue; + } + Id otherCid = cidOfVertex(otherV); + if (otherCid.compareTo(c.cid) < 0) { + // skip if it should be collected by otherC + continue; + } + if (!cedges.containsKey(otherCid)) { + cedges.put(otherCid, new MutableInt(0)); + } + cedges.get(otherCid).add(weightOfEdge(edge)); + } + } + // insert new community vertex and edges into storage + this.insertNewCommunity(pass, c.cid, kin, members, cedges); + } + this.graph().tx().commit(); + // reset communities + this.cache.reset(); + } + + public Object louvain(int maxTimes, int stableTimes, double precision) { + assert maxTimes > 0; + assert precision > 0d; + + this.defineSchemaOfPk(); + + /* + * iterate until it has stabilized or + * the maximum number of times is reached + */ + int times = maxTimes; + int movedTimes = 0; + double movedPercent = 0d; + double lastMovedPercent = 0d; + + for (int i = 0; i < maxTimes; i++) { + boolean finished = true; + movedPercent = 0d; + lastMovedPercent = 1d; + int tinyChanges = 0; + while ((movedPercent = this.moveCommunities(i)) > 0d) { + movedTimes++; + finished = false; + if (lastMovedPercent - movedPercent < precision) { + tinyChanges++; + } + if (i == 0 && movedPercent < precision) { + // stop the first round of iterations early + break; + } + if (tinyChanges >= stableTimes) { + // maybe always shaking and falling into an dead loop + break; + } + lastMovedPercent = movedPercent; + } + if (finished) { + times = i; + break; + } else { + this.defineSchemaOfPassN(i); + this.mergeCommunities(i); + } + } + + long communities = 0L; + String commLabel = this.passLabel; + if (!commLabel.isEmpty()) { + GraphTraversal<?, Long> t = this.g.V().hasLabel(commLabel).count(); + communities = this.execute(t, t::next); + } + return ImmutableMap.of("pass_times", times, + "phase1_times", movedTimes, + "last_precision", movedPercent, + "times", maxTimes, + "communities", communities); + } + + public double modularity(int pass) { + // pass: label the last pass + String label = labelOfPassN(pass); + Number kin = this.g.V().hasLabel(label).values(C_KIN).sum().next(); + Number weight = this.g.E().hasLabel(label).values(C_WEIGHT).sum().next(); + double m = kin.intValue() + weight.floatValue() * 2.0d; + double q = 0.0d; + Iterator<Vertex> coms = this.g.V().hasLabel(label); + while (coms.hasNext()) { + Vertex com = coms.next(); + int cin = com.value(C_KIN); + Number cout = this.g.V(com).bothE().values(C_WEIGHT).sum().next(); + double cdegree = cin + cout.floatValue(); + // Q = ∑(I/M - ((2I+O)/2M)^2) + q += cin / m - Math.pow(cdegree / m, 2); + } + return q; + } + + public Collection<Object> showCommunity(String community) { + final String C_PASS0 = labelOfPassN(0); + Collection<Object> comms = Arrays.asList(community); + boolean reachPass0 = false; + while (comms.size() > 0 && !reachPass0) { + Iterator<Vertex> subComms = this.vertices(comms.iterator()); + comms = new HashSet<>(); + while (subComms.hasNext()) { + this.updateProgress(++this.progress); + Vertex sub = subComms.next(); + if (sub.property(C_MEMBERS).isPresent()) { + Set<Object> members = sub.value(C_MEMBERS); + reachPass0 = sub.label().equals(C_PASS0); + comms.addAll(members); + } + } + } + return comms; + } + + public long clearPass(int pass) { + GraphTraversal<Edge, Edge> te = this.g.E(); + if (pass < 0) { + // drop edges of all pass + List<String> els = this.cpassEdgeLabels(); + if (els.size() > 0) { + String first = els.remove(0); + te = te.hasLabel(first, els.toArray(new String[els.size()])); + this.drop(te); + } + // drop schema + for (String label : this.cpassEdgeLabels()) { + this.graph().schema().edgeLabel(label).remove(); + } + } else { + // drop edges of pass N + String label = labelOfPassN(pass); + if (this.graph().existsEdgeLabel(label)) { + te = te.hasLabel(label); + this.drop(te); + // drop schema + this.graph().schema().edgeLabel(label).remove(); + } + } + + GraphTraversal<Vertex, Vertex> tv = this.g.V(); + if (pass < 0) { + // drop vertices of all pass + List<String> vls = this.cpassVertexLabels(); + if (vls.size() > 0) { + String first = vls.remove(0); + tv = tv.hasLabel(first, vls.toArray(new String[vls.size()])); + this.drop(tv); + } + // drop schema + for (String label : this.cpassVertexLabels()) { + this.graph().schema().vertexLabel(label).remove(); + } + } else { + // drop vertices of pass N + String label = labelOfPassN(pass); + if (this.graph().existsVertexLabel(label)) { + tv = tv.hasLabel(label); + this.drop(tv); + // drop schema + this.graph().schema().vertexLabel(label).remove(); + } + } + + return this.progress; + } + + private static class Community { + + // community id (stored as a backend vertex) + private final Id cid; + // community members size + private int size = 0; + /* + * weight of all edges in community(2X), sum of kin of new members + * [each is from the last pass, stored in backend vertex] + */ + private int kin = 0; + /* + * weight of all edges between communities, sum of kout of new members + * [each is last pass, calculated in real time by neighbors] + */ + // + private int kout = 0; + + public Community(Id cid) { + this.cid = cid; + } + + public boolean empty() { + return this.size <= 0; + } + + public void add(LouvainTraverser t, Vertex v, List<Edge> nbs) { + this.size++; + this.kin += t.kinOfVertex(v); + this.kout += t.weightOfVertex(v, nbs); + } + + public void remove(LouvainTraverser t, Vertex v, List<Edge> nbs) { + this.size--; + this.kin -= t.kinOfVertex(v); + this.kout -= t.weightOfVertex(v, nbs); + } + + public int kin() { + return this.kin; + } + + public int kout() { + return this.kout; + } + + @Override + public String toString() { + return String.format("[%s](size=%s kin=%s kout=%s)", + this.cid , this.size, this.kin, this.kout); + } + } + + private static class Cache { + + private final Map<Id, Float> vertexWeightCache; + private final Map<Id, Community> vertex2Community; + private final Map<Id, Integer> genIds; + + public Cache() { + this.vertexWeightCache = new HashMap<>(); + this.vertex2Community = new HashMap<>(); + this.genIds = new HashMap<>(); + } + + public Community vertex2Community(Id id) { + return this.vertex2Community.get(id); + } + + public void vertex2Community(Id id, Community c) { + this.vertex2Community.put(id, c); + } + + public Float vertexWeight(Id id) { + return this.vertexWeightCache.get(id); + } + + public void vertexWeight(Id id, float weight) { + this.vertexWeightCache.put(id, weight); + } + + public void reset() { + this.vertexWeightCache.clear(); + this.vertex2Community.clear(); + this.genIds.clear(); + } + + public void resetVertexWeight() { + this.vertexWeightCache.clear(); + } + + public Id genId(int pass, Id cid) { + if (!this.genIds.containsKey(cid)) { + this.genIds.put(cid, this.genIds.size() + 1); + } + String id = pass + "~" + this.genIds.get(cid); + return IdGenerator.of(id); + } + + public Collection<Pair<Community, Set<Id>>> communities(){ + // TODO: get communities from backend store instead of ram + Map<Id, Pair<Community, Set<Id>>> comms = new HashMap<>(); + for (Entry<Id, Community> e : this.vertex2Community.entrySet()) { + Community c = e.getValue(); + if (c.empty()) { + continue; + } + Pair<Community, Set<Id>> pair = comms.get(c.cid); + if (pair == null) { + pair = Pair.of(c, new HashSet<>()); + comms.put(c.cid, pair); + } + // collect members joined to the community [current pass] + pair.getRight().add(e.getKey()); + } + return comms.values(); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java new file mode 100644 index 000000000..af7b299ae --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java @@ -0,0 +1,263 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.schema.SchemaManager; +import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.type.define.Directions; +import com.google.common.collect.ImmutableMap; + +public class LpaAlgorithm extends AbstractCommAlgorithm { + + @Override + public String name() { + return "lpa"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + times(parameters); + precision(parameters); + sourceLabel(parameters); + edgeLabel(parameters); + direction(parameters); + degree(parameters); + showCommunity(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + String showComm = showCommunity(parameters); + + try { + if (showComm != null) { + return traverser.showCommunity(showComm); + } else { + return traverser.lpa(sourceLabel(parameters), + edgeLabel(parameters), + direction(parameters), + degree(parameters), + times(parameters), + precision(parameters)); + } + } catch (Throwable e) { + job.graph().tx().rollback(); + throw e; + } + } + + public static class Traverser extends AlgoTraverser { + + public static final String C_LABEL = "c_label"; + private static final long LIMIT = MAX_QUERY_LIMIT; + + private final Random R = new Random(); + + public Traverser(Job<Object> job) { + super(job); + } + + public Object lpa(String sourceLabel, String edgeLabel, + Directions dir, long degree, + int maxTimes, double precision) { + assert maxTimes > 0; + assert precision > 0d; + + this.initSchema(); + + int times = maxTimes; + double changedPercent = 0d; + + /* + * Iterate until: + * 1.it has stabilized + * 2.or the maximum number of times is reached + */ + for (int i = 0; i < maxTimes; i++) { + changedPercent = this.detectCommunities(sourceLabel, edgeLabel, + dir, degree); + if (changedPercent <= precision) { + times = i + 1; + break; + } + } + + long communities = this.graph().traversal().V().limit(10000L) + .groupCount().by(C_LABEL) + .count(Scope.local).next(); + return ImmutableMap.of("iteration_times", times, + "last_precision", changedPercent, + "times", maxTimes, + "communities", communities); + } + + public Object showCommunity(String clabel) { + // all vertices with specified c-label + Iterator<Vertex> vertices = this.vertices(LIMIT); + vertices = filter(vertices, C_LABEL, clabel); + + JsonMap json = new JsonMap(); + json.startList(); + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + json.append(vertices.next().id().toString()); + } + json.endList(); + + return json.asJson(); + } + + private double detectCommunities(String sourceLabel, String edgeLabel, + Directions dir, long degree) { + // shuffle: r.order().by(shuffle) + // r = this.graph().traversal().V().sample((int) LIMIT); + + // all vertices + Iterator<Vertex> vertices = this.vertices(sourceLabel, LIMIT); + + long total = 0L; + long changed = 0L; + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + total++; + Vertex v = vertices.next(); + String label = this.voteCommunityOfVertex(v, edgeLabel, + dir, degree); + // update label if it's absent or changed + if (!labelPresent(v) || !label.equals(this.labelOfVertex(v))) { + changed++; + this.updateLabelOfVertex(v, label); + } + } + this.graph().tx().commit(); + + return total == 0L ? 0d : (double) changed / total; + } + + private String voteCommunityOfVertex(Vertex vertex, String edgeLabel, + Directions dir, long degree) { + // neighbors of source vertex v + Id source = (Id) vertex.id(); + Id labelId = this.getEdgeLabelId(edgeLabel); + Iterator<Id> neighbors = this.adjacentVertices(source, dir, + labelId, degree); + + // whether or not include vertex itself, greatly affects the result. + // get a larger number of small communities if include itself + //neighbors.inject(v); + + // calculate label frequency + Map<String, MutableInt> labels = new HashMap<>(); + while (neighbors.hasNext()) { + String label = this.labelOfVertex(neighbors.next()); + if (label == null) { + // ignore invalid or not-exist vertex + continue; + } + MutableInt labelCount = labels.get(label); + if (labelCount != null) { + labelCount.increment(); + } else { + labels.put(label, new MutableInt(1)); + } + } + + // isolated vertex + if (labels.size() == 0) { + return this.labelOfVertex(vertex); + } + + // get the labels with maximum frequency + List<String> maxLabels = new ArrayList<>(); + int maxFreq = 1; + for (Map.Entry<String, MutableInt> e : labels.entrySet()) { + int value = e.getValue().intValue(); + if (value > maxFreq) { + maxFreq = value; + maxLabels.clear(); + } + if (value == maxFreq) { + maxLabels.add(e.getKey()); + } + } + + /* + * TODO: + * keep origin label with probability to prevent monster communities + */ + + // random choice + int selected = this.R.nextInt(maxLabels.size()); + return maxLabels.get(selected); + } + + private boolean labelPresent(Vertex vertex) { + return vertex.property(C_LABEL).isPresent(); + } + + private String labelOfVertex(Vertex vertex) { + if (!labelPresent(vertex)) { + return vertex.id().toString(); + } + return vertex.value(C_LABEL); + } + + private String labelOfVertex(Id vid) { + // TODO: cache with Map<Id, String> + Iterator<Vertex> iter = this.graph().vertices(vid); + if (!iter.hasNext()) { + return null; + } + Vertex vertex = iter.next(); + return this.labelOfVertex(vertex); + } + + private void updateLabelOfVertex(Vertex v, String label) { + // TODO: cache with Map<Id, String> + v.property(C_LABEL, label); + this.commitIfNeeded(); + } + + private void initSchema() { + String cl = C_LABEL; + SchemaManager schema = this.graph().schema(); + schema.propertyKey(cl).asText().ifNotExist().create(); + for (VertexLabel vl : schema.getVertexLabels()) { + schema.vertexLabel(vl.name()) + .properties(cl).nullableKeys(cl) + .append(); + } + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java new file mode 100644 index 000000000..c47d19f65 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java @@ -0,0 +1,153 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm.comm; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.tinkerpop.gremlin.structure.Edge; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.structure.HugeEdge; +import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.util.InsertionOrderUtil; +import com.google.common.collect.ImmutableMap; + +public class TriangleCountAlgorithm extends AbstractCommAlgorithm { + + @Override + public String name() { + return "triangle_count"; + } + + @Override + public void checkParameters(Map<String, Object> parameters) { + direction(parameters); + degree(parameters); + } + + @Override + public Object call(Job<Object> job, Map<String, Object> parameters) { + Traverser traverser = new Traverser(job); + return traverser.triangleCount(direction(parameters), + degree(parameters)); + } + + protected static class Traverser extends AlgoTraverser { + + protected static final String KEY_TRIANGLES = "triangles"; + protected static final String KEY_TRIADS = "triads"; + + public Traverser(Job<Object> job) { + super(job); + } + + public Object triangleCount(Directions direction, long degree) { + Map<String, Long> results = triangles( direction, degree); + results = InsertionOrderUtil.newMap(results); + results.remove(KEY_TRIADS); + return results; + } + + protected Map<String, Long> triangles(Directions direction, + long degree) { + if (direction == null || direction == Directions.BOTH) { + throw new IllegalArgumentException("Direction must be OUT/IN"); + } + assert direction == Directions.OUT || direction == Directions.IN; + + Iterator<Edge> edges = this.edges(direction); + + long triangles = 0L; + long triads = 0L; + long total = 0L; + long totalVertices = 0L; + Id vertex = null; + + Set<Id> adjVertices = new HashSet<>(); + while (edges.hasNext()) { + HugeEdge edge = (HugeEdge) edges.next(); + this.updateProgress(++total); + + Id source = edge.ownerVertex().id(); + Id target = edge.otherVertex().id(); + if (vertex == source) { + // Ignore and skip the target vertex if exceed degree + if (adjVertices.size() < degree || degree == NO_LIMIT) { + adjVertices.add(target); + } + continue; + } + + if (vertex != null) { + assert vertex != source; + /* + * Find graph mode like this: + * A -> [B,C,D,E,F] + * B -> [D,F] + * E -> [B,C,F] + */ + triangles += this.intersect(direction, degree, adjVertices); + triads += this.localTriads(adjVertices.size()); + totalVertices++; + // Reset for the next source + adjVertices = new HashSet<>(); + } + vertex = source; + adjVertices.add(target); + } + + if (vertex != null) { + triangles += this.intersect(direction, degree, adjVertices); + triads += this.localTriads(adjVertices.size()); + totalVertices++; + } + + String suffix = "_" + direction.string(); + return ImmutableMap.of("edges" + suffix, total, + "vertices" + suffix, totalVertices, + KEY_TRIANGLES, triangles, + KEY_TRIADS, triads); + } + + protected long intersect(Directions dir, long degree, + Set<Id> adjVertices) { + long count = 0L; + Iterator<Id> vertices; + for (Id v : adjVertices) { + vertices = this.adjacentVertices(v, dir, null, degree); + while (vertices.hasNext()) { + Id vertex = vertices.next(); + if (adjVertices.contains(vertex)) { + count++; + } + } + } + return count; + } + + protected long localTriads(int size) { + return size * (size - 1L) / 2L; + } + } +}
