This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch olap-algo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 05ce6ac3e1dbe843759249e3d979e0a12dfc3a88 Author: Jermy Li <[email protected]> AuthorDate: Wed Jun 3 22:15:33 2020 +0800 fix algorithm can't stop caused by threads exception (#18) Change-Id: I546682b19fb5a84a65dc2a3bd77d62b386722bfa --- .../hugegraph/job/algorithm/AbstractAlgorithm.java | 24 +++++----- .../baidu/hugegraph/job/algorithm/Consumers.java | 36 +++++++++++---- .../job/algorithm/comm/LouvainTraverser.java | 51 +++++++++++++--------- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index d3311772a..327905ad3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -354,18 +354,20 @@ public abstract class AbstractAlgorithm implements Algorithm { Consumers<Vertex> consumers = new Consumers<>(this.executor, consumer, done); consumers.start(); - - long total = 0L; - while (vertices.hasNext()) { - this.updateProgress(++this.progress); - total++; - Vertex v = vertices.next(); - consumers.provide(v); + try { + long total = 0L; + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + total++; + Vertex v = vertices.next(); + consumers.provide(v); + } + return total; + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + consumers.await(); } - - consumers.await(); - - return total; } protected Iterator<Vertex> vertices() { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java index 526419c46..f5d01d980 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java @@ -29,6 +29,7 @@ import java.util.function.Consumer; import org.slf4j.Logger; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.util.ExecutorUtil; import com.baidu.hugegraph.util.Log; @@ -50,6 +51,7 @@ public class Consumers<V> { private final BlockingQueue<V> queue; private volatile boolean ending = false; + private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer<V> consumer) { this(executor, consumer, null); @@ -72,6 +74,8 @@ public class Consumers<V> { } public void start() { + this.ending = false; + this.exception = null; if (this.executor == null) { return; } @@ -81,11 +85,12 @@ public class Consumers<V> { this.executor.submit(() -> { try { this.run(); - if (this.done != null) { - this.done.run(); - } + this.done(); } catch (Throwable e) { + // Only the first exception of one thread can be stored + this.exception = e; LOG.error("Error when running task", e); + this.done(); } finally { this.latch.countDown(); } @@ -120,10 +125,19 @@ public class Consumers<V> { return true; } - public void provide(V v) { + private void done() { + if (this.done != null) { + this.done.run(); + } + } + + public void provide(V v) throws Throwable { if (this.executor == null) { + assert this.exception == null; // do job directly if without thread pool this.consumer.accept(v); + } else if (this.exception != null) { + throw this.exception; } else { try { this.queue.put(v); @@ -137,14 +151,12 @@ public class Consumers<V> { this.ending = true; if (this.executor == null) { // call done() directly if without thread pool - if (this.done != null) { - this.done.run(); - } + this.done(); } else { try { this.latch.await(); } catch (InterruptedException e) { - LOG.warn("Interrupted", e);; + LOG.warn("Interrupted", e); } } } @@ -163,4 +175,12 @@ public class Consumers<V> { return ExecutorUtil.newFixedThreadPool(workers, name); } } + + public static RuntimeException wrapException(Throwable e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new HugeException("Error when running task: %s", + HugeException.rootCause(e).getMessage(), e); + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java index e55152b10..5d7548aa3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java @@ -421,21 +421,25 @@ public class LouvainTraverser extends AlgoTraverser { moved.incrementAndGet(); } }); - consumers.start(); - while (vertices.hasNext()) { - this.updateProgress(++this.progress); - Vertex v = vertices.next(); - if (needSkipVertex(pass, v)) { - // skip the old intermediate data, or filter clabel - continue; + consumers.start(); + try { + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + Vertex v = vertices.next(); + if (needSkipVertex(pass, v)) { + // skip the old intermediate data, or filter clabel + continue; + } + total++; + consumers.provide(v); } - total++; - consumers.provide(v); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + consumers.await(); } - consumers.await(); - // maybe always shocking when set degree limited return total == 0L ? 0d : moved.doubleValue() / total; } @@ -455,19 +459,24 @@ public class LouvainTraverser extends AlgoTraverser { // commit when finished this.graph().tx().commit(); }); - consumers.start(); - for (Pair<Community, Set<Id>> pair : comms) { - Community c = pair.getLeft(); - if (c.empty()) { - continue; + consumers.start(); + try { + for (Pair<Community, Set<Id>> pair : comms) { + Community c = pair.getLeft(); + if (c.empty()) { + continue; + } + this.progress += pair.getRight().size(); + this.updateProgress(this.progress); + //this.mergeCommunity(pass, pair.getLeft(), pair.getRight()); + consumers.provide(pair); } - this.progress += pair.getRight().size(); - this.updateProgress(this.progress); - //this.mergeCommunity(pass, pair.getLeft(), pair.getRight()); - consumers.provide(pair); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + consumers.await(); } - consumers.await(); this.graph().tx().commit(); assert this.allMembersExist(pass);
