This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch olap-algo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit fe3572403d5c46b3dacfd18777b108a0ac539d0d Author: Jermy Li <[email protected]> AuthorDate: Wed May 20 16:54:44 2020 +0800 fix parallel LPA not commit by threads (#16) Change-Id: I8eaaeccaa0b23048a9d0f597080186c069b9799b --- .../com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java | 7 ++++++- .../main/java/com/baidu/hugegraph/job/algorithm/Consumers.java | 9 +++++++-- .../com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java | 5 +++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index c36a70405..5bb3426ff 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -325,11 +325,16 @@ public abstract class AbstractAlgorithm implements Algorithm { protected long traverse(String sourceLabel, String sourceCLabel, Consumer<Vertex> consumer) { + return this.traverse(sourceLabel, sourceCLabel, consumer, null); + } + + protected long traverse(String sourceLabel, String sourceCLabel, + Consumer<Vertex> consumer, Runnable done) { Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceLabel, Query.NO_LIMIT); Consumers<Vertex> consumers = new Consumers<>(this.executor, - consumer); + consumer, done); consumers.start(); long total = 0L; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java index 795e0d712..526419c46 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java @@ -122,7 +122,7 @@ public class Consumers<V> { public void provide(V v) { if (this.executor == null) { - // do job directly + // do job directly if without thread pool this.consumer.accept(v); } else { try { @@ -135,7 +135,12 @@ public class Consumers<V> { public void await() { this.ending = true; - if (this.executor != null) { + if (this.executor == null) { + // call done() directly if without thread pool + if (this.done != null) { + this.done.run(); + } + } else { try { this.latch.await(); } catch (InterruptedException e) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java index e98ed8480..59c420ae7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java @@ -152,10 +152,11 @@ public class LpaAlgorithm extends AbstractCommAlgorithm { if (this.voteCommunityAndUpdate(v, edgeLabel, dir, degree)) { changed.incrementAndGet(); } + }, () -> { + // commit when finished + this.graph().tx().commit(); }); - this.graph().tx().commit(); - return total == 0L ? 0d : changed.doubleValue() / total; }
