This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch olap-algo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit a11acdffc79fba5a723932b2b6af7facbd8a4354 Author: Jermy Li <[email protected]> AuthorDate: Tue May 12 20:59:35 2020 +0800 support parallel: Louvain,LPA,Rings,K-Core,Fusiform (#15) * optimize louvain by multi threads * implement louvain threads * fix race condition * implement merge community by multi threads * remove debug info * fix genId race condition * compatible with serial and parallel computing * support parallel lpa * support parallel: Louvain,LPA,Rings,K-Core,Fusiform Change-Id: I2425d1da58581ea7a61dce72a88355ae3d2dd610 --- .../hugegraph/job/algorithm/AbstractAlgorithm.java | 53 +++- .../baidu/hugegraph/job/algorithm/Consumers.java | 161 +++++++++++ .../job/algorithm/CountEdgeAlgorithm.java | 5 +- .../job/algorithm/CountVertexAlgorithm.java | 5 +- .../cent/BetweenessCentralityAlgorithm.java | 21 +- .../cent/ClosenessCentralityAlgorithm.java | 21 +- .../algorithm/cent/DegreeCentralityAlgorithm.java | 9 +- .../cent/EigenvectorCentralityAlgorithm.java | 21 +- .../algorithm/comm/ClusterCoeffcientAlgorithm.java | 7 +- .../job/algorithm/comm/KCoreAlgorithm.java | 55 ++-- .../job/algorithm/comm/LouvainAlgorithm.java | 9 +- .../job/algorithm/comm/LouvainTraverser.java | 311 +++++++++++++-------- .../hugegraph/job/algorithm/comm/LpaAlgorithm.java | 50 ++-- .../job/algorithm/comm/TriangleCountAlgorithm.java | 7 +- .../job/algorithm/path/RingsDetectAlgorithm.java | 50 ++-- .../similarity/FusiformSimilarityAlgorithm.java | 59 ++-- 16 files changed, 585 insertions(+), 259 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index 969bda1d8..c36a70405 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.mutable.MutableLong; @@ -87,6 +89,7 @@ public abstract class AbstractAlgorithm implements Algorithm { public static final String KEY_CAPACITY = "capacity"; public static final String KEY_LIMIT = "limit"; public static final String KEY_ALPHA = "alpha"; + public static final String KEY_WORKERS = "workers"; public static final long DEFAULT_CAPACITY = 10000000L; public static final long DEFAULT_LIMIT = 100L; @@ -213,6 +216,15 @@ public abstract class AbstractAlgorithm implements Algorithm { return parameterString(parameters, KEY_SOURCE_CLABEL); } + protected static int workers(Map<String, Object> parameters) { + if (!parameters.containsKey(KEY_WORKERS)) { + return -1; + } + int workers = parameterInt(parameters, KEY_WORKERS); + HugeTraverser.checkNonNegativeOrNoLimit(workers, KEY_WORKERS); + return workers; + } + public static Object parameter(Map<String, Object> parameters, String key) { Object value = parameters.get(key); E.checkArgument(value != null, @@ -280,20 +292,59 @@ public abstract class AbstractAlgorithm implements Algorithm { } } - public static class AlgoTraverser extends HugeTraverser { + public static class AlgoTraverser extends HugeTraverser + implements AutoCloseable { private final Job<Object> job; + protected final ExecutorService executor; protected long progress; public AlgoTraverser(Job<Object> job) { super(job.graph()); this.job = job; + this.executor = null; + } + + protected AlgoTraverser(Job<Object> job, String name, int workers) { + super(job.graph()); + this.job = job; + String prefix = name + "-" + job.task().id(); + this.executor = Consumers.newThreadPool(prefix, workers); } public void updateProgress(long progress) { this.job.updateProgress((int) progress); } + @Override + public void close() { + if (this.executor != null) { + this.executor.shutdown(); + } + } + + protected long traverse(String sourceLabel, String sourceCLabel, + Consumer<Vertex> consumer) { + Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceLabel, + Query.NO_LIMIT); + + Consumers<Vertex> consumers = new Consumers<>(this.executor, + consumer); + consumers.start(); + + long total = 0L; + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + total++; + Vertex v = vertices.next(); + consumers.provide(v); + } + + consumers.await(); + + return total; + } + protected Iterator<Vertex> vertices() { return this.vertices(Query.NO_LIMIT); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java new file mode 100644 index 000000000..795e0d712 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java @@ -0,0 +1,161 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.algorithm; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.util.ExecutorUtil; +import com.baidu.hugegraph.util.Log; + +public class Consumers<V> { + + public static final int CPUS = Runtime.getRuntime().availableProcessors(); + public static final int THREADS = 4 + CPUS / 4; + public static final int QUEUE_WORKER_SIZE = 1000; + + private static final Logger LOG = Log.logger(Consumers.class); + + private final ExecutorService executor; + private final Consumer<V> consumer; + private final Runnable done; + + private final int workers; + private final int queueSize; + private final CountDownLatch latch; + private final BlockingQueue<V> queue; + + private volatile boolean ending = false; + + public Consumers(ExecutorService executor, Consumer<V> consumer) { + this(executor, consumer, null); + } + + public Consumers(ExecutorService executor, + Consumer<V> consumer, Runnable done) { + this.executor = executor; + this.consumer = consumer; + this.done = done; + + int workers = THREADS; + if (this.executor instanceof ThreadPoolExecutor) { + workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize(); + } + this.workers = workers; + this.queueSize = QUEUE_WORKER_SIZE * workers; + this.latch = new CountDownLatch(workers); + this.queue = new ArrayBlockingQueue<>(this.queueSize); + } + + public void start() { + if (this.executor == null) { + return; + } + LOG.info("Starting {} workers with queue size {}...", + this.workers, this.queueSize); + for (int i = 0; i < this.workers; i++) { + this.executor.submit(() -> { + try { + this.run(); + if (this.done != null) { + this.done.run(); + } + } catch (Throwable e) { + LOG.error("Error when running task", e); + } finally { + this.latch.countDown(); + } + }); + } + } + + private void run() { + LOG.debug("Start to work..."); + while (!this.ending) { + this.consume(); + } + assert this.ending; + while (this.consume()); + + LOG.debug("Worker finished"); + } + + private boolean consume() { + V elem; + try { + elem = this.queue.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + return true; + } + if (elem == null) { + return false; + } + // do job + this.consumer.accept(elem); + return true; + } + + public void provide(V v) { + if (this.executor == null) { + // do job directly + this.consumer.accept(v); + } else { + try { + this.queue.put(v); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e);; + } + } + } + + public void await() { + this.ending = true; + if (this.executor != null) { + try { + this.latch.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e);; + } + } + } + + public static ExecutorService newThreadPool(String prefix, int workers) { + if (workers == 0) { + return null; + } else { + if (workers < 0) { + assert workers == -1; + workers = Consumers.THREADS; + } else if (workers > Consumers.CPUS * 2) { + workers = Consumers.CPUS * 2; + } + String name = prefix + "-worker-%d"; + return ExecutorUtil.newFixedThreadPool(workers, name); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java index 9fb122348..670f54471 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java @@ -43,8 +43,9 @@ public class CountEdgeAlgorithm extends AbstractAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.count(); + try (Traverser traverser = new Traverser(job)) { + return traverser.count(); + } } private static class Traverser extends AlgoTraverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java index 582e0bb69..68a59a363 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java @@ -43,8 +43,9 @@ public class CountVertexAlgorithm extends AbstractAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.count(); + try (Traverser traverser = new Traverser(job)) { + return traverser.count(); + } } private static class Traverser extends AlgoTraverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java index 12e3acba0..4f3415a15 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java @@ -42,16 +42,17 @@ public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.betweenessCentrality(direction(parameters), - edgeLabel(parameters), - depth(parameters), - degree(parameters), - sample(parameters), - sourceLabel(parameters), - sourceSample(parameters), - sourceCLabel(parameters), - top(parameters)); + try (Traverser traverser = new Traverser(job)) { + return traverser.betweenessCentrality(direction(parameters), + edgeLabel(parameters), + depth(parameters), + degree(parameters), + sample(parameters), + sourceLabel(parameters), + sourceSample(parameters), + sourceCLabel(parameters), + top(parameters)); + } } private static class Traverser extends AbstractCentAlgorithm.Traverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java index cb64bd8bc..6719eee1e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java @@ -51,16 +51,17 @@ public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.closenessCentrality(direction(parameters), - edgeLabel(parameters), - depth(parameters), - degree(parameters), - sample(parameters), - sourceLabel(parameters), - sourceSample(parameters), - sourceCLabel(parameters), - top(parameters)); + try (Traverser traverser = new Traverser(job)) { + return traverser.closenessCentrality(direction(parameters), + edgeLabel(parameters), + depth(parameters), + degree(parameters), + sample(parameters), + sourceLabel(parameters), + sourceSample(parameters), + sourceCLabel(parameters), + top(parameters)); + } } private static class Traverser extends AbstractCentAlgorithm.Traverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java index a19c09822..f29a6301d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java @@ -47,10 +47,11 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.degreeCentrality(direction(parameters), - edgeLabel(parameters), - top(parameters)); + try (Traverser traverser = new Traverser(job)) { + return traverser.degreeCentrality(direction(parameters), + edgeLabel(parameters), + top(parameters)); + } } private static class Traverser extends AlgoTraverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java index ce47417c4..39cec64cd 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java @@ -44,16 +44,17 @@ public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.eigenvectorCentrality(direction(parameters), - edgeLabel(parameters), - depth(parameters), - degree(parameters), - sample(parameters), - sourceLabel(parameters), - sourceSample(parameters), - sourceCLabel(parameters), - top(parameters)); + try (Traverser traverser = new Traverser(job)) { + return traverser.eigenvectorCentrality(direction(parameters), + edgeLabel(parameters), + depth(parameters), + degree(parameters), + sample(parameters), + sourceLabel(parameters), + sourceSample(parameters), + sourceCLabel(parameters), + top(parameters)); + } } private static class Traverser extends AbstractCentAlgorithm.Traverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java index cc893fc1f..52f0b07a7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java @@ -40,9 +40,10 @@ public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.clusterCoeffcient(direction(parameters), - degree(parameters)); + try (Traverser traverser = new Traverser(job)) { + return traverser.clusterCoeffcient(direction(parameters), + degree(parameters)); + } } private static class Traverser extends TriangleCountAlgorithm.Traverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java index 4cc6a88ba..6a721258a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java @@ -33,7 +33,6 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.id.Id; -import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser; @@ -65,19 +64,22 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm { sourceCLabel(parameters); direction(parameters); edgeLabel(parameters); + workers(parameters); } @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.kcore(sourceLabel(parameters), - sourceCLabel(parameters), - direction(parameters), - edgeLabel(parameters), - k(parameters), - alpha(parameters), - degree(parameters), - merged(parameters)); + int workers = workers(parameters); + try (Traverser traverser = new Traverser(job, workers)) { + return traverser.kcore(sourceLabel(parameters), + sourceCLabel(parameters), + direction(parameters), + edgeLabel(parameters), + k(parameters), + alpha(parameters), + degree(parameters), + merged(parameters)); + } } protected static int k(Map<String, Object> parameters) { @@ -98,16 +100,14 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm { public static class Traverser extends AlgoTraverser { - public Traverser(Job<Object> job) { - super(job); + public Traverser(Job<Object> job, int workers) { + super(job, "kcore", workers); } public Object kcore(String sourceLabel, String sourceCLabel, Directions dir, String label, int k, double alpha, long degree, boolean merged) { HugeGraph graph = this.graph(); - Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceCLabel, - Query.NO_LIMIT); EdgeLabel edgeLabel = label == null ? null : graph.edgeLabel(label); KcoreTraverser traverser = new KcoreTraverser(graph); @@ -115,27 +115,34 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm { kcoresJson.startObject(); kcoresJson.appendKey("kcores"); kcoresJson.startList(); - Set<Set<Id>> kcoreSet = new HashSet<>(); - while(vertices.hasNext()) { - this.updateProgress(++this.progress); - Vertex vertex = vertices.next(); - Set<Id> kcore = traverser.kcore(IteratorUtils.of(vertex), + + Set<Set<Id>> kcores = new HashSet<>(); + + this.traverse(sourceLabel, sourceCLabel, v -> { + Set<Id> kcore = traverser.kcore(IteratorUtils.of(v), dir, edgeLabel, k, alpha, degree); if (kcore.isEmpty()) { - continue; + return; } if (merged) { - mergeKcores(kcoreSet, kcore); + synchronized (kcores) { + mergeKcores(kcores, kcore); + } } else { - kcoresJson.appendRaw(JsonUtil.toJson(kcore)); + String json = JsonUtil.toJson(kcore); + synchronized (kcoresJson) { + kcoresJson.appendRaw(json); + } } - } + }); + if (merged) { - for (Set<Id> kcore : kcoreSet) { + for (Set<Id> kcore : kcores) { kcoresJson.appendRaw(JsonUtil.toJson(kcore)); } } + kcoresJson.endList(); kcoresJson.endObject(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java index c0c05f9a2..446ab2686 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java @@ -42,6 +42,7 @@ public class LouvainAlgorithm extends AbstractCommAlgorithm { showModularity(parameters); showCommunity(parameters); clearPass(parameters); + workers(parameters); } @Override @@ -49,13 +50,15 @@ public class LouvainAlgorithm extends AbstractCommAlgorithm { String label = sourceLabel(parameters); String clabel = sourceCLabel(parameters); long degree = degree(parameters); + int workers = workers(parameters); - LouvainTraverser traverser = new LouvainTraverser(job, degree, - label, clabel); Long clearPass = clearPass(parameters); Long modPass = showModularity(parameters); String showComm = showCommunity(parameters); - try { + + try (LouvainTraverser traverser = new LouvainTraverser( + job, workers, degree, + label, clabel)) { if (clearPass != null) { return traverser.clearPass(clearPass.intValue()); } else if (modPass != null) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java index a63a1259d..e55152b10 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java @@ -29,8 +29,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableFloat; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; @@ -47,6 +51,7 @@ import com.baidu.hugegraph.iterator.ListIterator; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser; +import com.baidu.hugegraph.job.algorithm.Consumers; import com.baidu.hugegraph.schema.SchemaLabel; import com.baidu.hugegraph.schema.SchemaManager; import com.baidu.hugegraph.schema.VertexLabel; @@ -78,9 +83,9 @@ public class LouvainTraverser extends AlgoTraverser { private long m; private String passLabel; - public LouvainTraverser(Job<Object> job, long degree, + public LouvainTraverser(Job<Object> job, int workers, long degree, String sourceLabel, String sourceCLabel) { - super(job); + super(job, "louvain", workers); this.g = this.graph().traversal(); this.sourceLabel = sourceLabel; this.sourceCLabel = sourceCLabel; @@ -197,13 +202,13 @@ public class LouvainTraverser extends AlgoTraverser { private void insertNewCommunity(int pass, Id cid, float cweight, int kin, List<String> members, - Map<Id, MutableInt> cedges) { + Map<Id, MutableFloat> cedges) { // create backend vertex if it's the first time Id vid = this.cache.genId(pass, cid); Vertex node = this.newCommunityNode(vid, cweight, kin, members); commitIfNeeded(); // update backend vertex edges - for (Map.Entry<Id, MutableInt> e : cedges.entrySet()) { + for (Map.Entry<Id, MutableFloat> e : cedges.entrySet()) { float weight = e.getValue().floatValue(); vid = this.cache.genId(pass, e.getKey()); Vertex targetV = this.makeCommunityNode(vid); @@ -280,7 +285,7 @@ public class LouvainTraverser extends AlgoTraverser { return 1f; } - private Id cidOfVertex(Vertex v, List<Edge> nbs) { + private Community communityOfVertex(Vertex v, List<Edge> nbs) { Id vid = (Id) v.id(); Community c = this.cache.vertex2Community(vid); // ensure source vertex exist in cache @@ -288,7 +293,7 @@ public class LouvainTraverser extends AlgoTraverser { c = this.wrapCommunity(v, nbs); assert c != null; } - return c != null ? c.cid : vid; + return c; } // 1: wrap original vertex as community node @@ -305,7 +310,7 @@ public class LouvainTraverser extends AlgoTraverser { comm = new Community(vid); comm.add(this, v, nbs); - this.cache.vertex2Community(vid, comm); + comm = this.cache.vertex2CommunityIfAbsent(vid, comm); return comm; } @@ -331,31 +336,93 @@ public class LouvainTraverser extends AlgoTraverser { return comms.values(); } - private void moveCommunity(Vertex v, List<Edge> nbs, Community newC) { + private void doMoveCommunity(Vertex v, List<Edge> nbs, Community newC) { Id vid = (Id) v.id(); - // remove v from old community - Community oldC = this.cache.vertex2Community(vid); + // update community of v (return the origin one) + Community oldC = this.cache.vertex2Community(vid, newC); + + // remove v from old community. should synchronized (vid)? if (oldC != null) { oldC.remove(this, v, nbs); } // add v to new community newC.add(this, v, nbs); - LOG.debug("Move {} to comm: {}", v, newC); - - // update community of v - this.cache.vertex2Community(vid, newC); + LOG.debug("Move {} to community: {}", v, newC); + } + + private boolean moveCommunity(Vertex v, int pass) { + // move vertex to neighbor community if needed + List<Edge> nbs = neighbors((Id) v.id()); + Community c = communityOfVertex(v, nbs); + double ki = kinOfVertex(v) + weightOfVertex(v, nbs); + // update community of v if △Q changed + double maxDeltaQ = 0d; + Community bestComm = null; + // list all neighbor communities of v + for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) { + // △Q = (Ki_in - Ki * Etot / m) / 2m + Community otherC = nbc.getLeft(); + if (otherC.size() >= MAX_COMM_SIZE) { + LOG.info("Skip community {} for {} due to its size >= {}", + otherC.cid, v, MAX_COMM_SIZE); + continue; + } + // weight between c and otherC + double kiin = nbc.getRight().floatValue(); + // weight of otherC + double tot = otherC.kin() + otherC.kout(); + if (c.equals(otherC)) { + assert c == otherC; + if (tot < ki) { + /* + * expect tot >= ki, but multi-threads may + * cause tot < ki due to concurrent update otherC + */ + LOG.warn("Changing vertex: {}(ki={}, kiin={}, pass={}), otherC: {}", + v, ki, kiin, pass, otherC); + } + tot -= ki; + // assert tot >= 0d : otherC + ", tot=" + tot + ", ki=" + ki; + // expect tot >= 0, but may be something wrong? + if (tot < 0d) { + tot = 0d; + } + } + double deltaQ = kiin - ki * tot / this.m; + if (deltaQ > maxDeltaQ) { + // TODO: cache otherC for neighbors the same community + maxDeltaQ = deltaQ; + bestComm = otherC; + } + } + if (maxDeltaQ > 0d && !c.equals(bestComm)) { + // move v to the community of maxQ neighbor + doMoveCommunity(v, nbs, bestComm); + return true; + } + return false; } private double moveCommunities(int pass) { + LOG.info("Detect community for pass {}", pass); Iterator<Vertex> vertices = this.sourceVertices(pass); // shuffle //r = r.order().by(shuffle); long total = 0L; - long moved = 0L; + AtomicLong moved = new AtomicLong(0L); + + Consumers<Vertex> consumers = new Consumers<>(this.executor, v -> { + // called by multi-threads + if (this.moveCommunity(v, pass)) { + moved.incrementAndGet(); + } + }); + consumers.start(); + while (vertices.hasNext()) { this.updateProgress(++this.progress); Vertex v = vertices.next(); @@ -364,106 +431,93 @@ public class LouvainTraverser extends AlgoTraverser { continue; } total++; - List<Edge> nbs = neighbors((Id) v.id()); - Id cid = cidOfVertex(v, nbs); - double ki = kinOfVertex(v) + weightOfVertex(v, nbs); - // update community of v if △Q changed - double maxDeltaQ = 0d; - Community bestComm = null; - // list all neighbor communities of v - for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) { - // △Q = (Ki_in - Ki * Etot / m) / 2m - Community otherC = nbc.getLeft(); - if (otherC.size() >= MAX_COMM_SIZE) { - LOG.info("Skip community {} for {} due to its size >= {}", - otherC.cid, v, MAX_COMM_SIZE); - continue; - } - // weight between c and otherC - double kiin = nbc.getRight().floatValue(); - // weight of otherC - double tot = otherC.kin() + otherC.kout(); - if (cid.equals(otherC.cid)) { - tot -= ki; - assert tot >= 0d; - // expect tot >= 0, but may be something wrong? - if (tot < 0d) { - tot = 0d; - } - } - double deltaQ = kiin - ki * tot / this.m; - if (deltaQ > maxDeltaQ) { - // TODO: cache otherC for neighbors the same community - maxDeltaQ = deltaQ; - bestComm = otherC; - } - } - if (maxDeltaQ > 0d && !cid.equals(bestComm.cid)) { - moved++; - // move v to the community of maxQ neighbor - moveCommunity(v, nbs, bestComm); - } + consumers.provide(v); } - // maybe always shocking when set degree limit - return total == 0L ? 0d : (double) moved / total; + consumers.await(); + + // maybe always shocking when set degree limited + return total == 0L ? 0d : moved.doubleValue() / total; } private void mergeCommunities(int pass) { + LOG.info("Merge community for pass {}", pass); // merge each community as a vertex Collection<Pair<Community, Set<Id>>> comms = this.cache.communities(); - assert this.allMembersExist(comms, pass -1); + assert this.allMembersExist(comms, pass - 1); this.cache.resetVertexWeight(); + + Consumers<Pair<Community, Set<Id>>> consumers = new Consumers<>( + this.executor, pair -> { + // called by multi-threads + this.mergeCommunity(pass, pair.getLeft(), pair.getRight()); + }, () -> { + // commit when finished + this.graph().tx().commit(); + }); + consumers.start(); + for (Pair<Community, Set<Id>> pair : comms) { - Community c = pair.getKey(); + Community c = pair.getLeft(); if (c.empty()) { continue; } - // update kin and edges between communities - int kin = c.kin(); - Set<Id> vertices = pair.getRight(); - assert !vertices.isEmpty(); - assert vertices.size() == c.size(); - List<String> members = new ArrayList<>(vertices.size()); - Map<Id, MutableInt> cedges = new HashMap<>(vertices.size()); - for (Id v : vertices) { - this.updateProgress(++this.progress); - members.add(v.toString()); - // collect edges between this community and other communities - List<Edge> neighbors = neighbors(v); - for (Edge edge : neighbors) { - Vertex otherV = ((HugeEdge) edge).otherVertex(); - if (vertices.contains(otherV.id())) { - // inner edges of this community, will be calc twice - // due to both e-in and e-out are in vertices, - kin += weightOfEdge(edge); - continue; - } - assert this.cache.vertex2Community(otherV.id()) != null; - Id otherCid = cidOfVertex(otherV, null); - if (otherCid.compareTo(c.cid) < 0) { - // skip if it should be collected by otherC - continue; - } - if (!cedges.containsKey(otherCid)) { - cedges.put(otherCid, new MutableInt(0)); - } - // update edge weight - cedges.get(otherCid).add(weightOfEdge(edge)); - } - } - // insert new community vertex and edges into storage - this.insertNewCommunity(pass, c.cid, c.weight(), kin, members, cedges); + this.progress += pair.getRight().size(); + this.updateProgress(this.progress); + //this.mergeCommunity(pass, pair.getLeft(), pair.getRight()); + consumers.provide(pair); } + consumers.await(); + this.graph().tx().commit(); + assert this.allMembersExist(pass); + // reset communities this.cache.reset(); } + private void mergeCommunity(int pass, Community c, Set<Id> cvertices) { + // update kin and edges between communities + int kin = c.kin(); + int membersSize = cvertices.size(); + assert !cvertices.isEmpty(); + assert membersSize == c.size(); + List<String> members = new ArrayList<>(membersSize); + Map<Id, MutableFloat> cedges = new HashMap<>(membersSize); + for (Id v : cvertices) { + members.add(v.toString()); + // collect edges between this community and other communities + List<Edge> neighbors = neighbors(v); + for (Edge edge : neighbors) { + Vertex otherV = ((HugeEdge) edge).otherVertex(); + if (cvertices.contains(otherV.id())) { + // inner edges of this community, will be calc twice + // due to both e-in and e-out are in vertices, + kin += weightOfEdge(edge); + continue; + } + assert this.cache.vertex2Community(otherV.id()) != null; + Id otherCid = communityOfVertex(otherV, null).cid; + if (otherCid.compareTo(c.cid) < 0) { + // skip if it should be collected by otherC + continue; + } + if (!cedges.containsKey(otherCid)) { + cedges.putIfAbsent(otherCid, new MutableFloat(0f)); + } + // update edge weight + cedges.get(otherCid).add(weightOfEdge(edge)); + } + } + + // insert new community vertex and edges into storage + this.insertNewCommunity(pass, c.cid, c.weight(), kin, members, cedges); + } + private boolean allMembersExist(Collection<Pair<Community, Set<Id>>> comms, - int pass) { - String lastLabel = labelOfPassN(pass); - GraphTraversal<Vertex, Object> t = pass < 0 ? this.g.V().id() : + int lastPass) { + String lastLabel = labelOfPassN(lastPass); + GraphTraversal<Vertex, Object> t = lastPass < 0 ? this.g.V().id() : this.g.V().hasLabel(lastLabel).id(); Set<Object> all = this.execute(t, t::toSet); for (Pair<Community, Set<Id>> comm : comms) { @@ -475,6 +529,24 @@ public class LouvainTraverser extends AlgoTraverser { return all.isEmpty(); } + private boolean allMembersExist(int pass) { + String label = labelOfPassN(pass); + int lastPass = pass - 1; + Number expected; + if (lastPass < 0) { + expected = tryNext(this.g.V().count()).longValue() - + tryNext(this.g.V().hasLabel(label).count()).longValue(); + } else { + expected = tryNext(this.g.V().hasLabel(labelOfPassN(lastPass)) + .values(C_WEIGHT).sum()); + } + Number actual = tryNext(this.g.V().hasLabel(label) + .values(C_WEIGHT).sum()); + boolean allExist = actual.floatValue() == expected.floatValue(); + assert allExist : actual + "!=" + expected; + return allExist; + } + public Object louvain(int maxTimes, int stableTimes, double precision) { assert maxTimes > 0; assert precision > 0d; @@ -678,28 +750,39 @@ public class LouvainTraverser extends AlgoTraverser { return this.weight; } - public void add(LouvainTraverser t, Vertex v, List<Edge> nbs) { + public synchronized void add(LouvainTraverser t, + Vertex v, List<Edge> nbs) { this.size++; this.weight += t.cweightOfVertex(v); this.kin += t.kinOfVertex(v); this.kout += t.weightOfVertex(v, nbs); } - public void remove(LouvainTraverser t, Vertex v, List<Edge> nbs) { + public synchronized void remove(LouvainTraverser t, + Vertex v, List<Edge> nbs) { this.size--; this.weight -= t.cweightOfVertex(v); this.kin -= t.kinOfVertex(v); this.kout -= t.weightOfVertex(v, nbs); } - public int kin() { + public synchronized int kin() { return this.kin; } - public float kout() { + public synchronized float kout() { return this.kout; } + @Override + public boolean equals(Object object) { + if (!(object instanceof Community)) { + return false; + } + Community other = (Community) object; + return Objects.equals(this.cid, other.cid); + } + @Override public String toString() { return String.format("[%s](size=%s weight=%s kin=%s kout=%s)", @@ -715,9 +798,9 @@ public class LouvainTraverser extends AlgoTraverser { private final Map<Id, Integer> genIds; public Cache() { - this.vertexWeightCache = new HashMap<>(); - this.vertex2Community = new HashMap<>(); - this.genIds = new HashMap<>(); + this.vertexWeightCache = new ConcurrentHashMap<>(); + this.vertex2Community = new ConcurrentHashMap<>(); + this.genIds = new ConcurrentHashMap<>(); } public Community vertex2Community(Object id) { @@ -725,8 +808,16 @@ public class LouvainTraverser extends AlgoTraverser { return this.vertex2Community.get(id); } - public void vertex2Community(Id id, Community c) { - this.vertex2Community.put(id, c); + public Community vertex2Community(Id id, Community c) { + return this.vertex2Community.put(id, c); + } + + public Community vertex2CommunityIfAbsent(Id id, Community c) { + Community old = this.vertex2Community.putIfAbsent(id, c); + if (old != null) { + c = old; + } + return c; } public Float vertexWeight(Id id) { @@ -748,11 +839,13 @@ public class LouvainTraverser extends AlgoTraverser { } public Id genId(int pass, Id cid) { - if (!this.genIds.containsKey(cid)) { - this.genIds.put(cid, this.genIds.size() + 1); + synchronized (this.genIds) { + if (!this.genIds.containsKey(cid)) { + this.genIds.putIfAbsent(cid, this.genIds.size() + 1); + } + String id = pass + "~" + this.genIds.get(cid); + return IdGenerator.of(id); } - String id = pass + "~" + this.genIds.get(cid); - return IdGenerator.of(id); } @SuppressWarnings("unused") diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java index abcdb938c..e98ed8480 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.tinkerpop.gremlin.process.traversal.Scope; @@ -54,14 +55,15 @@ public class LpaAlgorithm extends AbstractCommAlgorithm { direction(parameters); degree(parameters); showCommunity(parameters); + workers(parameters); } @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); + int workers = workers(parameters); String showComm = showCommunity(parameters); - try { + try (Traverser traverser = new Traverser(job, workers)) { if (showComm != null) { return traverser.showCommunity(showComm); } else { @@ -84,8 +86,8 @@ public class LpaAlgorithm extends AbstractCommAlgorithm { private final Random R = new Random(); - public Traverser(Job<Object> job) { - super(job); + public Traverser(Job<Object> job, int workers) { + super(job, "lpa", workers); } public Object lpa(String sourceLabel, String edgeLabel, @@ -113,7 +115,7 @@ public class LpaAlgorithm extends AbstractCommAlgorithm { } } - long communities = this.graph().traversal().V().limit(10000L) + long communities = this.graph().traversal().V().limit(100000L) .groupCount().by(C_LABEL) .count(Scope.local).next(); return ImmutableMap.of("iteration_times", times, @@ -143,26 +145,30 @@ public class LpaAlgorithm extends AbstractCommAlgorithm { // shuffle: r.order().by(shuffle) // r = this.graph().traversal().V().sample((int) LIMIT); - // all vertices - Iterator<Vertex> vertices = this.vertices(sourceLabel, LIMIT); - - long total = 0L; - long changed = 0L; - while (vertices.hasNext()) { - this.updateProgress(++this.progress); - total++; - Vertex v = vertices.next(); - String label = this.voteCommunityOfVertex(v, edgeLabel, - dir, degree); - // update label if it's absent or changed - if (!labelPresent(v) || !label.equals(this.labelOfVertex(v))) { - changed++; - this.updateLabelOfVertex(v, label); + // detect all vertices + AtomicLong changed = new AtomicLong(0L); + long total = this.traverse(sourceLabel, null, v -> { + // called by multi-threads + if (this.voteCommunityAndUpdate(v, edgeLabel, dir, degree)) { + changed.incrementAndGet(); } - } + }); + this.graph().tx().commit(); - return total == 0L ? 0d : (double) changed / total; + return total == 0L ? 0d : changed.doubleValue() / total; + } + + private boolean voteCommunityAndUpdate(Vertex vertex, String edgeLabel, + Directions dir, long degree) { + String label = this.voteCommunityOfVertex(vertex, edgeLabel, + dir, degree); + // update label if it's absent or changed + if (!labelPresent(vertex) || !label.equals(labelOfVertex(vertex))) { + this.updateLabelOfVertex(vertex, label); + return true; + } + return false; } private String voteCommunityOfVertex(Vertex vertex, String edgeLabel, diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java index c47d19f65..34a1a5658 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java @@ -48,9 +48,10 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm { @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.triangleCount(direction(parameters), - degree(parameters)); + try (Traverser traverser = new Traverser(job)) { + return traverser.triangleCount(direction(parameters), + degree(parameters)); + } } protected static class Traverser extends AlgoTraverser { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java index b3ce1ec99..855b7c817 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java @@ -19,17 +19,11 @@ package com.baidu.hugegraph.job.algorithm.path; -import java.util.Iterator; import java.util.Map; -import org.apache.tinkerpop.gremlin.structure.Vertex; - -import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.id.Id; -import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; -import com.baidu.hugegraph.structure.HugeVertex; import com.baidu.hugegraph.traversal.algorithm.SubGraphTraverser; import com.baidu.hugegraph.type.define.Directions; import com.baidu.hugegraph.util.JsonUtil; @@ -56,41 +50,42 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm { sourceCLabel(parameters); direction(parameters); edgeLabel(parameters); + workers(parameters); } @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.rings(sourceLabel(parameters), - sourceCLabel(parameters), - direction(parameters), - edgeLabel(parameters), - depth(parameters), - degree(parameters), - capacity(parameters), - limit(parameters)); + int workers = workers(parameters); + try (Traverser traverser = new Traverser(job, workers)) { + return traverser.rings(sourceLabel(parameters), + sourceCLabel(parameters), + direction(parameters), + edgeLabel(parameters), + depth(parameters), + degree(parameters), + capacity(parameters), + limit(parameters)); + } } public static class Traverser extends AlgoTraverser { - public Traverser(Job<Object> job) { - super(job); + public Traverser(Job<Object> job, int workers) { + super(job, "ring", workers); } public Object rings(String sourceLabel, String sourceCLabel, Directions dir, String label, int depth, long degree, long capacity, long limit) { - HugeGraph graph = this.graph(); - Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceCLabel, - Query.NO_LIMIT); JsonMap ringsJson = new JsonMap(); ringsJson.startObject(); ringsJson.appendKey("rings"); ringsJson.startList(); - SubGraphTraverser traverser = new SubGraphTraverser(graph); - while(vertices.hasNext()) { - this.updateProgress(++this.progress); - Id source = ((HugeVertex) vertices.next()).id(); + + SubGraphTraverser traverser = new SubGraphTraverser(this.graph()); + + this.traverse(sourceLabel, sourceCLabel, v -> { + Id source = (Id) v.id(); PathSet rings = traverser.rings(source, dir, label, depth, true, degree, capacity, limit); for (Path ring : rings) { @@ -101,10 +96,13 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm { } } if (source.equals(min)) { - ringsJson.appendRaw(JsonUtil.toJson(ring.vertices())); + String ringJson = JsonUtil.toJson(ring.vertices()); + synchronized (ringsJson) { + ringsJson.appendRaw(ringJson); + } } } - } + }); ringsJson.endList(); ringsJson.endObject(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java index 26ee4e25e..463526c5d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java @@ -19,14 +19,11 @@ package com.baidu.hugegraph.job.algorithm.similarity; -import java.util.Iterator; import java.util.Map; -import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import com.baidu.hugegraph.HugeGraph; -import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; import com.baidu.hugegraph.schema.EdgeLabel; @@ -72,24 +69,27 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm { sourceCLabel(parameters); direction(parameters); edgeLabel(parameters); + workers(parameters); } @Override public Object call(Job<Object> job, Map<String, Object> parameters) { - Traverser traverser = new Traverser(job); - return traverser.fusiformSimilars(sourceLabel(parameters), - sourceCLabel(parameters), - direction(parameters), - edgeLabel(parameters), - minNeighbors(parameters), - alpha(parameters), - minSimilars(parameters), - top(parameters), - groupProperty(parameters), - minGroups(parameters), - degree(parameters), - capacity(parameters), - limit(parameters)); + int workers = workers(parameters); + try (Traverser traverser = new Traverser(job, workers)) { + return traverser.fusiformSimilars(sourceLabel(parameters), + sourceCLabel(parameters), + direction(parameters), + edgeLabel(parameters), + minNeighbors(parameters), + alpha(parameters), + minSimilars(parameters), + top(parameters), + groupProperty(parameters), + minGroups(parameters), + degree(parameters), + capacity(parameters), + limit(parameters)); + } } protected static int minNeighbors(Map<String, Object> parameters) { @@ -128,8 +128,8 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm { protected static class Traverser extends AlgoTraverser { - public Traverser(Job<Object> job) { - super(job); + public Traverser(Job<Object> job, int workers) { + super(job, "fusiform", workers); } public Object fusiformSimilars(String sourceLabel, String sourceCLabel, @@ -138,31 +138,30 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm { int minSimilars, long topSimilars, String groupProperty, int minGroups, long degree, long capacity, long limit) { - Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceCLabel, - Query.NO_LIMIT); HugeGraph graph = this.graph(); EdgeLabel edgeLabel = label == null ? null : graph.edgeLabel(label); - FusiformSimilarityTraverser traverser = new - FusiformSimilarityTraverser(graph); + FusiformSimilarityTraverser traverser = + new FusiformSimilarityTraverser(graph); JsonMap similarsJson = new JsonMap(); similarsJson.startObject(); - while(vertices.hasNext()) { - this.updateProgress(++this.progress); - Vertex vertex = vertices.next(); + + this.traverse(sourceLabel, sourceCLabel, v -> { SimilarsMap similars = traverser.fusiformSimilarity( - IteratorUtils.of(vertex), direction, + IteratorUtils.of(v), direction, edgeLabel, minNeighbors, alpha, minSimilars, (int) topSimilars, groupProperty, minGroups, degree, capacity, limit, true); if (similars.isEmpty()) { - continue; + return; } String result = JsonUtil.toJson(similars.toMap()); result = result.substring(1, result.length() - 1); - similarsJson.appendRaw(result); - } + synchronized (similarsJson) { + similarsJson.appendRaw(result); + } + }); similarsJson.endObject(); return similarsJson.asJson();
