This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch olap-algo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 864c1fcef2e6fe6ee1e48fb88beab915afeac826 Author: Jermy Li <[email protected]> AuthorDate: Tue Jul 28 22:10:03 2020 +0800 fix no auth with worker thread of olap algo (#27) call graph close instead of closeTx Change-Id: I0e329280b067f34daec69c9b1b2b81a6cd3309bf --- .../baidu/hugegraph/job/algorithm/Consumers.java | 34 ++++++++++++---------- .../job/algorithm/SubgraphStatAlgorithm.java | 13 +++++++-- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java index 711e95edc..1c68413fc 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java @@ -30,6 +30,7 @@ import java.util.function.Consumer; import org.slf4j.Logger; import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.task.TaskManager.ContextCallable; import com.baidu.hugegraph.util.ExecutorUtil; import com.baidu.hugegraph.util.Log; @@ -82,24 +83,27 @@ public class Consumers<V> { LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { - this.executor.submit(() -> { - try { - this.run(); - this.done(); - } catch (Throwable e) { - // Only the first exception of one thread can be stored - this.exception = e; - if (!(e instanceof StopExecution)) { - LOG.error("Error when running task", e); - } - this.done(); - } finally { - this.latch.countDown(); - } - }); + this.executor.submit(new ContextCallable<>(this::runAndDone)); } } + private Void runAndDone() { + try { + this.run(); + this.done(); + } catch (Throwable e) { + // Only the first exception of one thread can be stored + this.exception = e; + if (!(e instanceof StopExecution)) { + LOG.error("Error when running task", e); + } + this.done(); + } finally { + this.latch.countDown(); + } + return null; + } + private void run() { LOG.debug("Start to work..."); while (!this.ending) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java index 199d1b020..a098a8582 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.slf4j.Logger; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.StandardHugeGraph; @@ -40,11 +41,11 @@ import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm; import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm; import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm; import com.baidu.hugegraph.task.HugeTask; -import com.baidu.hugegraph.testutil.Whitebox; import com.baidu.hugegraph.traversal.algorithm.HugeTraverser; import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; +import com.baidu.hugegraph.util.Log; import com.google.common.collect.ImmutableMap; public class SubgraphStatAlgorithm extends AbstractAlgorithm { @@ -52,6 +53,8 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm { public static final String KEY_SUBGRAPH = "subgraph"; public static final String KEY_COPY_SCHEMA = "copy_schema"; + private static final Logger LOG = Log.logger(SubgraphStatAlgorithm.class); + @Override public String name() { return "subgraph_stat"; @@ -77,8 +80,12 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm { return traverser.subgraphStat(tmpJob); } finally { graph.truncateBackend(); - // FIXME: task thread can't call close() (will hang), use closeTx() - Whitebox.invoke(graph.getClass(), "closeTx", graph); + try { + graph.close(); + } catch (Throwable e) { + LOG.warn("Can't close subgraph_stat temp graph {}: {}", + graph, e.getMessage(), e); + } } }
