This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch pd-store
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 94edfcd72014851936f55704bed172541805e5be
Author: Wu Chencan <[email protected]>
AuthorDate: Tue Oct 24 06:33:24 2023 -0500

    feat(core): support batch+parallel edges traverse (#2312)
    
    - Enhance Consumers.java, supporting ExceptionHandle and `Future` to handle 
InterruptedException when awaiting
    - Add Nested Iterator Edge and support batch execution
    - Support batch execution & thread parallel in KoutTraverser and Kneighbor
---
 .../backend/query/EdgesQueryIterator.java          |  64 ++++++
 .../org/apache/hugegraph/task/TaskManager.java     |  15 +-
 .../traversal/algorithm/HugeTraverser.java         |  45 +++++
 .../traversal/algorithm/KneighborTraverser.java    |  56 +++---
 .../traversal/algorithm/KoutTraverser.java         |  76 +++----
 .../traversal/algorithm/OltpTraverser.java         | 223 ++++++++++++++++++++-
 .../algorithm/records/KneighborRecords.java        |  14 +-
 .../hugegraph/traversal/algorithm/steps/Steps.java |   4 +
 .../java/org/apache/hugegraph/util/Consumers.java  | 128 ++++++++----
 .../backend/store/rocksdb/RocksDBStore.java        |  13 +-
 10 files changed, 516 insertions(+), 122 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java
new file mode 100644
index 000000000..4ab9a8859
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.query;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.type.define.Directions;
+
+public class EdgesQueryIterator implements Iterator<Query> {
+
+    private final List<Id> labels;
+    private final Directions directions;
+    private final long limit;
+    private final Iterator<Id> sources;
+
+    public EdgesQueryIterator(Iterator<Id> sources,
+                              Directions directions,
+                              List<Id> labels,
+                              long limit) {
+        this.sources = sources;
+        this.labels = labels;
+        this.directions = directions;
+        // Traverse NO_LIMIT ε’Œ Query.NO_LIMIT 不同
+        this.limit = limit < 0 ? Query.NO_LIMIT : limit;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return sources.hasNext();
+    }
+
+    @Override
+    public Query next() {
+        Id sourceId = this.sources.next();
+        ConditionQuery query = GraphTransaction.constructEdgesQuery(sourceId,
+                                                                    
this.directions,
+                                                                    
this.labels);
+        if (this.limit != Query.NO_LIMIT) {
+            query.limit(this.limit);
+            query.capacity(this.limit);
+        } else {
+            query.capacity(Query.NO_CAPACITY);
+        }
+        return query;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index 056b7ac5a..177af64ba 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -26,16 +26,17 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
 import org.apache.hugegraph.type.define.NodeRole;
-import org.apache.hugegraph.util.*;
 import org.apache.hugegraph.util.Consumers;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.ExecutorUtil;
 import org.apache.hugegraph.util.LockUtil;
+import org.apache.hugegraph.util.Log;
 import org.slf4j.Logger;
 
-import org.apache.hugegraph.HugeException;
-import org.apache.hugegraph.HugeGraphParams;
-import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
-
 public final class TaskManager {
 
     private static final Logger LOG = Log.logger(TaskManager.class);
@@ -53,7 +54,7 @@ public final class TaskManager {
     public static final String DISTRIBUTED_TASK_SCHEDULER = 
"distributed-scheduler-%d";
 
     protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
-
+    private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
     private static final int THREADS = 4;
     private static final TaskManager MANAGER = new TaskManager(THREADS);
 
@@ -184,7 +185,7 @@ public final class TaskManager {
                 graph.closeTx();
             } else {
                 Consumers.executeOncePerThread(this.taskExecutor, totalThreads,
-                                               graph::closeTx);
+                                               graph::closeTx, 
TX_CLOSE_TIMEOUT);
             }
         } catch (Exception e) {
             throw new HugeException("Exception when closing task tx", e);
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
index f5415d9c5..194576e85 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
@@ -17,6 +17,8 @@
 
 package org.apache.hugegraph.traversal.algorithm;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,6 +39,7 @@ import org.apache.hugegraph.HugeGraph;
 import org.apache.hugegraph.backend.id.Id;
 import org.apache.hugegraph.backend.query.Aggregate;
 import org.apache.hugegraph.backend.query.ConditionQuery;
+import org.apache.hugegraph.backend.query.EdgesQueryIterator;
 import org.apache.hugegraph.backend.query.Query;
 import org.apache.hugegraph.backend.query.QueryResults;
 import org.apache.hugegraph.backend.tx.GraphTransaction;
@@ -66,6 +69,7 @@ import org.apache.hugegraph.util.collection.CollectionFactory;
 import org.apache.hugegraph.util.collection.ObjectIntMapping;
 import org.apache.hugegraph.util.collection.ObjectIntMappingFactory;
 import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ImmutableList;
@@ -465,6 +469,13 @@ public class HugeTraverser {
         return edgeStep.skipSuperNodeIfNeeded(edges);
     }
 
+    public EdgesIterator edgesOfVertices(Iterator<Id> sources,
+                                         Directions dir,
+                                         List<Id> labelIds,
+                                         long degree) {
+        return new EdgesIterator(new EdgesQueryIterator(sources, dir, 
labelIds, degree));
+    }
+
     public Iterator<Edge> edgesOfVertex(Id source, Steps steps) {
         List<Id> edgeLabels = steps.edgeLabels();
         ConditionQuery cq = GraphTransaction.constructEdgesQuery(
@@ -474,6 +485,11 @@ public class HugeTraverser {
             cq.limit(steps.limit());
         }
 
+        if (steps.isEdgeEmpty()) {
+            Iterator<Edge> edges = this.graph().edges(cq);
+            return edgesOfVertexStep(edges, steps);
+        }
+
         Map<Id, ConditionQuery> edgeConditions =
                 getFilterQueryConditions(steps.edgeSteps(), HugeType.EDGE);
 
@@ -1004,4 +1020,33 @@ public class HugeTraverser {
             return edges;
         }
     }
+
+    public class EdgesIterator implements Iterator<Iterator<Edge>>, Closeable {
+
+        private final Iterator<Iterator<Edge>> currentIter;
+
+        public EdgesIterator(EdgesQueryIterator queries) {
+            List<Iterator<Edge>> iteratorList = new ArrayList<>();
+            while (queries.hasNext()) {
+                Iterator<Edge> edges = graph.edges(queries.next());
+                iteratorList.add(edges);
+            }
+            this.currentIter = iteratorList.iterator();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.currentIter.hasNext();
+        }
+
+        @Override
+        public Iterator<Edge> next() {
+            return this.currentIter.next();
+        }
+
+        @Override
+        public void close() throws IOException {
+            CloseableIterator.closeIterator(currentIter);
+        }
+    }
 }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
index 9f16f480b..565d0af5f 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
@@ -17,11 +17,11 @@
 
 package org.apache.hugegraph.traversal.algorithm;
 
-import java.util.Iterator;
 import java.util.Set;
 import java.util.function.Consumer;
 
 import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.EdgeId;
 import org.apache.hugegraph.backend.id.Id;
 import org.apache.hugegraph.structure.HugeEdge;
 import org.apache.hugegraph.traversal.algorithm.records.KneighborRecords;
@@ -48,25 +48,27 @@ public class KneighborTraverser extends OltpTraverser {
 
         Id labelId = this.getEdgeLabelId(label);
 
-        Set<Id> latest = newSet();
-        Set<Id> all = newSet();
+        KneighborRecords records = new KneighborRecords(true, sourceV, true);
 
-        latest.add(sourceV);
-        this.vertexIterCounter.addAndGet(1L);
+        Consumer<EdgeId> consumer = edgeId -> {
+            if (this.reachLimit(limit, records.size())) {
+                return;
+            }
+            records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
+        };
 
         while (depth-- > 0) {
-            long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size();
-            latest = this.adjacentVertices(sourceV, latest, dir, labelId,
-                                           all, degree, remaining);
-            all.addAll(latest);
-            this.vertexIterCounter.addAndGet(1L);
-            this.edgeIterCounter.addAndGet(latest.size());
-            if (reachLimit(limit, all.size())) {
+            records.startOneLayer(true);
+            traverseIdsByBfs(records.keys(), dir, labelId, degree, NO_LIMIT, 
consumer);
+            records.finishOneLayer();
+            if (reachLimit(limit, records.size())) {
                 break;
             }
         }
 
-        return all;
+        this.vertexIterCounter.addAndGet(records.size());
+
+        return records.idsBySet(limit);
     }
 
     public KneighborRecords customizedKneighbor(Id source, Steps steps,
@@ -76,33 +78,29 @@ public class KneighborTraverser extends OltpTraverser {
         checkPositive(maxDepth, "k-neighbor max_depth");
         checkLimit(limit);
 
-        boolean concurrent = maxDepth >= this.concurrentDepth();
-
-        KneighborRecords records = new KneighborRecords(concurrent,
+        KneighborRecords records = new KneighborRecords(true,
                                                         source, true);
 
-        Consumer<Id> consumer = v -> {
+        Consumer<Edge> consumer = edge -> {
             if (this.reachLimit(limit, records.size())) {
                 return;
             }
-            Iterator<Edge> edges = edgesOfVertex(v, steps);
-            this.vertexIterCounter.addAndGet(1L);
-            while (!this.reachLimit(limit, records.size()) && edges.hasNext()) 
{
-                HugeEdge edge = (HugeEdge) edges.next();
-                Id target = edge.id().otherVertexId();
-                records.addPath(v, target);
-
-                records.edgeResults().addEdge(v, target, edge);
-
-                this.edgeIterCounter.addAndGet(1L);
-            }
+            EdgeId edgeId = ((HugeEdge) edge).id();
+            records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
+            records.edgeResults().addEdge(edgeId.ownerVertexId(), 
edgeId.otherVertexId(), edge);
         };
 
         while (maxDepth-- > 0) {
             records.startOneLayer(true);
-            traverseIds(records.keys(), consumer, concurrent);
+            traverseIdsByBfs(records.keys(), steps, NO_LIMIT, consumer);
             records.finishOneLayer();
+            if (this.reachLimit(limit, records.size())) {
+                break;
+            }
         }
+
+        this.vertexIterCounter.addAndGet(records.size());
+
         return records;
     }
 
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
index 9924c766c..c683694c1 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
@@ -18,12 +18,15 @@
 package org.apache.hugegraph.traversal.algorithm;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
 
 import org.apache.hugegraph.HugeException;
 import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.EdgeId;
 import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.Query;
 import org.apache.hugegraph.structure.HugeEdge;
 import org.apache.hugegraph.traversal.algorithm.records.KoutRecords;
 import org.apache.hugegraph.traversal.algorithm.steps.Steps;
@@ -57,34 +60,45 @@ public class KoutTraverser extends OltpTraverser {
 
         Id labelId = this.getEdgeLabelId(label);
 
-        Set<Id> latest = newIdSet();
-        latest.add(sourceV);
+        Set<Id> sources = newIdSet();
+        Set<Id> neighbors = newIdSet();
+        Set<Id> visited = nearest ? newIdSet() : null;
 
-        Set<Id> all = newIdSet();
-        all.add(sourceV);
+        neighbors.add(sourceV);
+
+        ConcurrentVerticesConsumer consumer;
+
+        long remaining = capacity == NO_LIMIT ? NO_LIMIT : capacity - 1;
 
-        long remaining = capacity == NO_LIMIT ?
-                         NO_LIMIT : capacity - latest.size();
-        this.vertexIterCounter.addAndGet(1L);
         while (depth-- > 0) {
             // Just get limit nodes in last layer if limit < remaining capacity
             if (depth == 0 && limit != NO_LIMIT &&
                 (limit < remaining || remaining == NO_LIMIT)) {
                 remaining = limit;
             }
-            if (nearest) {
-                latest = this.adjacentVertices(sourceV, latest, dir, labelId,
-                                               all, degree, remaining);
-                all.addAll(latest);
-            } else {
-                latest = this.adjacentVertices(sourceV, latest, dir, labelId,
-                                               null, degree, remaining);
+
+            if (visited != null) {
+                visited.addAll(neighbors);
             }
-            this.vertexIterCounter.addAndGet(1L);
-            this.edgeIterCounter.addAndGet(latest.size());
+
+            // swap sources and neighbors
+            Set<Id> tmp = neighbors;
+            neighbors = sources;
+            sources = tmp;
+
+            // start
+            consumer = new ConcurrentVerticesConsumer(sourceV, visited, 
remaining, neighbors);
+
+            this.vertexIterCounter.addAndGet(sources.size());
+            this.edgeIterCounter.addAndGet(neighbors.size());
+
+            traverseIdsByBfs(sources.iterator(), dir, labelId, degree, 
capacity, consumer);
+
+            sources.clear();
+
             if (capacity != NO_LIMIT) {
                 // Update 'remaining' value to record remaining capacity
-                remaining -= latest.size();
+                remaining -= neighbors.size();
 
                 if (remaining <= 0 && depth > 0) {
                     throw new HugeException(
@@ -94,7 +108,7 @@ public class KoutTraverser extends OltpTraverser {
             }
         }
 
-        return latest;
+        return neighbors;
     }
 
     public KoutRecords customizedKout(Id source, Steps steps,
@@ -107,33 +121,25 @@ public class KoutTraverser extends OltpTraverser {
         checkLimit(limit);
         long[] depth = new long[1];
         depth[0] = maxDepth;
-        boolean concurrent = maxDepth >= this.concurrentDepth();
 
-        KoutRecords records = new KoutRecords(concurrent, source, nearest, 0);
+        KoutRecords records = new KoutRecords(true, source, nearest, 0);
 
-        Consumer<Id> consumer = v -> {
+        Consumer<Edge> consumer = edge -> {
             if (this.reachLimit(limit, depth[0], records.size())) {
                 return;
             }
-            Iterator<Edge> edges = edgesOfVertex(v, steps);
-            this.vertexIterCounter.addAndGet(1L);
-            while (!this.reachLimit(limit, depth[0], records.size()) &&
-                   edges.hasNext()) {
-                HugeEdge edge = (HugeEdge) edges.next();
-                Id target = edge.id().otherVertexId();
-                records.addPath(v, target);
-                this.checkCapacity(capacity, records.accessed(), depth[0]);
-
-                records.edgeResults().addEdge(v, target, edge);
-
-                this.edgeIterCounter.addAndGet(1L);
-            }
+            EdgeId edgeId = ((HugeEdge) edge).id();
+            records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
+            records.edgeResults().addEdge(edgeId.ownerVertexId(), 
edgeId.otherVertexId(), edge);
         };
 
         while (depth[0]-- > 0) {
+            List<Id> sources = records.ids(Query.NO_LIMIT);
             records.startOneLayer(true);
-            this.traverseIds(records.keys(), consumer, concurrent);
+            traverseIdsByBfs(sources.iterator(), steps, capacity, consumer);
+            this.vertexIterCounter.addAndGet(sources.size());
             records.finishOneLayer();
+            checkCapacity(capacity, records.accessed(), depth[0]);
         }
         return records;
     }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
index b05de2422..c05d8f89f 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
@@ -17,24 +17,36 @@
 
 package org.apache.hugegraph.traversal.algorithm;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
-import com.google.common.base.Objects;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.EdgeId;
 import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.EdgesQueryIterator;
 import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.iterator.FilterIterator;
+import org.apache.hugegraph.iterator.MapperIterator;
+import org.apache.hugegraph.structure.HugeEdge;
+import org.apache.hugegraph.traversal.algorithm.steps.Steps;
+import org.apache.hugegraph.type.define.Directions;
 import org.apache.hugegraph.util.Consumers;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
 
-import org.apache.hugegraph.iterator.FilterIterator;
+import com.google.common.base.Objects;
 
 public abstract class OltpTraverser extends HugeTraverser
                                     implements AutoCloseable {
@@ -75,7 +87,7 @@ public abstract class OltpTraverser extends HugeTraverser
 
     protected long traversePairs(Iterator<Pair<Id, Id>> pairs,
                                  Consumer<Pair<Id, Id>> consumer) {
-        return this.traverse(pairs, consumer, "traverse-pairs");
+        return this.traverseByOne(pairs, consumer, "traverse-pairs");
     }
 
     protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer,
@@ -93,18 +105,19 @@ public abstract class OltpTraverser extends HugeTraverser
     }
 
     protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer) {
-        return this.traverse(ids, consumer, "traverse-ids");
+        return this.traverseByOne(ids, consumer, "traverse-ids");
     }
 
-    protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer,
-                                String name) {
+    protected <K> long traverseByOne(Iterator<K> iterator,
+                                     Consumer<K> consumer,
+                                     String taskName) {
         if (!iterator.hasNext()) {
             return 0L;
         }
 
         Consumers<K> consumers = new Consumers<>(executors.getExecutor(),
                                                  consumer, null);
-        consumers.start(name);
+        consumers.start(taskName);
         long total = 0L;
         try {
             while (iterator.hasNext()) {
@@ -129,11 +142,101 @@ public abstract class OltpTraverser extends HugeTraverser
         return total;
     }
 
+    protected void traverseIdsByBfs(Iterator<Id> vertices,
+                                    Directions dir,
+                                    Id label,
+                                    long degree,
+                                    long capacity,
+                                    Consumer<EdgeId> consumer) {
+        List<Id> labels = label == null ? Collections.emptyList() :
+                                          Collections.singletonList(label);
+        OneStepEdgeIterConsumer edgeIterConsumer = new 
OneStepEdgeIterConsumer(consumer, capacity);
+
+        EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, 
degree);
+
+        // parallel out-of-order execution
+        this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-step", 
1);
+    }
+
+    protected void traverseIdsByBfs(Iterator<Id> vertices,
+                                    Steps steps,
+                                    long capacity,
+                                    Consumer<Edge> consumer) {
+        StepsEdgeIterConsumer edgeIterConsumer =
+                new StepsEdgeIterConsumer(consumer, capacity, steps);
+
+        EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices,
+                                                                  
steps.direction(),
+                                                                  
steps.edgeLabels(),
+                                                                  
steps.degree());
+
+        // get Iterator<Iterator<edges>> from Iterator<Query>
+        EdgesIterator edgeIter = new EdgesIterator(queryIterator);
+
+        // parallel out-of-order execution
+        this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-steps", 
1);
+    }
+
+    protected <K> long traverseByBatch(Iterator<Iterator<K>> sources,
+                                       Consumer<Iterator<K>> consumer,
+                                       String taskName, int concurrentWorkers) 
{
+        if (!sources.hasNext()) {
+            return 0L;
+        }
+        AtomicBoolean done = new AtomicBoolean(false);
+        Consumers<Iterator<K>> consumers = null;
+        try {
+            consumers = buildConsumers(consumer, concurrentWorkers, done,
+                                       executors.getExecutor());
+            return startConsumers(sources, taskName, done, consumers);
+        } finally {
+            assert consumers != null;
+            executors.returnExecutor(consumers.executor());
+        }
+    }
+
+    private <K> long startConsumers(Iterator<Iterator<K>> sources,
+                                    String taskName,
+                                    AtomicBoolean done,
+                                    Consumers<Iterator<K>> consumers) {
+        long total = 0L;
+        try {
+            consumers.start(taskName);
+            while (sources.hasNext() && !done.get()) {
+                total++;
+                Iterator<K> v = sources.next();
+                consumers.provide(v);
+            }
+        } catch (Consumers.StopExecution e) {
+            // pass
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            try {
+                consumers.await();
+            } catch (Throwable e) {
+                throw Consumers.wrapException(e);
+            } finally {
+                CloseableIterator.closeIterator(sources);
+            }
+        }
+        return total;
+    }
+
+    private <K> Consumers<Iterator<K>> buildConsumers(Consumer<Iterator<K>> 
consumer,
+                                                      int queueSizePerWorker,
+                                                      AtomicBoolean done,
+                                                      ExecutorService 
executor) {
+        return new Consumers<>(executor,
+                               consumer,
+                               null,
+                               e -> done.set(true),
+                               queueSizePerWorker);
+    }
+
     protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
                                       String key, Object value) {
-        return new FilterIterator<>(vertices, vertex -> {
-            return match(vertex, key, value);
-        });
+        return new FilterIterator<>(vertices, vertex -> match(vertex, key, 
value));
     }
 
     protected boolean match(Element elem, String key, Object value) {
@@ -175,4 +278,104 @@ public abstract class OltpTraverser extends HugeTraverser
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count;
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+            this.count = new AtomicInteger(0);
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (this.limit != NO_LIMIT && count.get() >= this.limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (this.sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (this.excluded != null && this.excluded.contains(targetV)) {
+                return;
+            }
+
+            if (this.neighbors.add(targetV)) {
+                if (this.limit != NO_LIMIT) {
+                    this.count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgesConsumer<T, E> implements Consumer<Iterator<T>> 
{
+
+        private final Consumer<E> consumer;
+        private final long capacity;
+
+        public EdgesConsumer(Consumer<E> consumer, long capacity) {
+            this.consumer = consumer;
+            this.capacity = capacity;
+        }
+
+        protected abstract Iterator<E> prepare(Iterator<T> iter);
+
+        @Override
+        public void accept(Iterator<T> edgeIter) {
+            Iterator<E> ids = prepare(edgeIter);
+            long counter = 0;
+            while (ids.hasNext()) {
+                if (Thread.currentThread().isInterrupted()) {
+                    LOG.warn("Consumer is Interrupted");
+                    break;
+                }
+                counter++;
+                this.consumer.accept(ids.next());
+            }
+            long total = edgeIterCounter.addAndGet(counter);
+            // traverse by batch & improve performance
+            if (this.capacity != NO_LIMIT && total >= this.capacity) {
+                throw new Consumers.StopExecution("reach capacity");
+            }
+        }
+    }
+
+    public class OneStepEdgeIterConsumer extends EdgesConsumer<Edge, EdgeId> {
+
+        public OneStepEdgeIterConsumer(Consumer<EdgeId> consumer, long 
capacity) {
+            super(consumer, capacity);
+        }
+
+        @Override
+        protected Iterator<EdgeId> prepare(Iterator<Edge> edgeIter) {
+            return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id());
+        }
+    }
+
+    public class StepsEdgeIterConsumer extends EdgesConsumer<Edge, Edge> {
+
+        private final Steps steps;
+
+        public StepsEdgeIterConsumer(Consumer<Edge> consumer, long capacity,
+                                     Steps steps) {
+            super(consumer, capacity);
+            this.steps = steps;
+        }
+
+        @Override
+        protected Iterator<Edge> prepare(Iterator<Edge> edgeIter) {
+            return edgesOfVertexStep(edgeIter, this.steps);
+        }
+    }
 }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
index 7e04a286c..649b1c211 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
@@ -19,7 +19,9 @@ package org.apache.hugegraph.traversal.algorithm.records;
 
 import static org.apache.hugegraph.traversal.algorithm.HugeTraverser.NO_LIMIT;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hugegraph.backend.id.Id;
@@ -45,6 +47,17 @@ public class KneighborRecords extends 
SingleWayMultiPathsRecords {
     @Override
     public List<Id> ids(long limit) {
         List<Id> ids = CollectionFactory.newList(CollectionType.EC);
+        this.getRecords(limit, ids);
+        return ids;
+    }
+
+    public Set<Id> idsBySet(long limit) {
+        Set<Id> ids = CollectionFactory.newSet(CollectionType.EC);
+        this.getRecords(limit, ids);
+        return ids;
+    }
+
+    private void getRecords(long limit, Collection<Id> ids) {
         Stack<Record> records = this.records();
         // Not include record(i=0) to ignore source vertex
         for (int i = 1; i < records.size(); i++) {
@@ -54,7 +67,6 @@ public class KneighborRecords extends 
SingleWayMultiPathsRecords {
                 limit--;
             }
         }
-        return ids;
     }
 
     @Override
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
index d1a9238be..c2a1a7e1e 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
@@ -138,6 +138,10 @@ public class Steps {
         return new ArrayList<>(this.edgeSteps.keySet());
     }
 
+    public boolean isEdgeEmpty() {
+        return this.edgeSteps.isEmpty();
+    }
+
     public boolean isVertexEmpty() {
         return this.vertexSteps.isEmpty();
     }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
index 00689e0c5..06e678fd9 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
@@ -27,16 +27,16 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
-import org.apache.hugegraph.config.CoreOptions;
-import org.slf4j.Logger;
-
 import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.config.CoreOptions;
 import org.apache.hugegraph.task.TaskManager.ContextCallable;
+import org.slf4j.Logger;
 
 public final class Consumers<V> {
 
@@ -46,16 +46,16 @@ public final class Consumers<V> {
 
     private static final Logger LOG = Log.logger(Consumers.class);
 
+    private final V QUEUE_END = (V) new Object();
     private final ExecutorService executor;
     private final Consumer<V> consumer;
-    private final Runnable done;
-
+    private final Runnable doneHandle;
+    private final Consumer<Throwable> exceptionHandle;
     private final int workers;
+    private final List<Future> runningFutures;
     private final int queueSize;
     private final CountDownLatch latch;
     private final BlockingQueue<V> queue;
-
-    private volatile boolean ending = false;
     private volatile Throwable exception = null;
 
     public Consumers(ExecutorService executor, Consumer<V> consumer) {
@@ -63,23 +63,40 @@ public final class Consumers<V> {
     }
 
     public Consumers(ExecutorService executor,
-                     Consumer<V> consumer, Runnable done) {
+                     Consumer<V> consumer, Runnable doneHandle) {
+        this(executor, consumer, doneHandle, QUEUE_WORKER_SIZE);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer,
+                     Runnable doneHandle,
+                     int queueSizePerWorker) {
+        this(executor, consumer, doneHandle, null, queueSizePerWorker);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer,
+                     Runnable doneHandle,
+                     Consumer<Throwable> exceptionHandle,
+                     int queueSizePerWorker) {
         this.executor = executor;
         this.consumer = consumer;
-        this.done = done;
+        this.doneHandle = doneHandle;
+        this.exceptionHandle = exceptionHandle;
 
         int workers = THREADS;
         if (this.executor instanceof ThreadPoolExecutor) {
             workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize();
         }
         this.workers = workers;
-        this.queueSize = QUEUE_WORKER_SIZE * workers;
+
+        this.runningFutures = new ArrayList<>(workers);
+        this.queueSize = queueSizePerWorker * workers + 1;
         this.latch = new CountDownLatch(workers);
         this.queue = new ArrayBlockingQueue<>(this.queueSize);
     }
 
     public void start(String name) {
-        this.ending = false;
         this.exception = null;
         if (this.executor == null) {
             return;
@@ -87,7 +104,8 @@ public final class Consumers<V> {
         LOG.info("Starting {} workers[{}] with queue size {}...",
                  this.workers, name, this.queueSize);
         for (int i = 0; i < this.workers; i++) {
-            this.executor.submit(new ContextCallable<>(this::runAndDone));
+            this.runningFutures.add(
+                    this.executor.submit(new 
ContextCallable<>(this::runAndDone)));
         }
     }
 
@@ -95,11 +113,15 @@ public final class Consumers<V> {
         try {
             this.run();
         } catch (Throwable e) {
-            // Only the first exception of one thread can be stored
-            this.exception = e;
-            if (!(e instanceof StopExecution)) {
+            if (e instanceof StopExecution) {
+                this.queue.clear();
+                putQueueEnd();
+            } else {
+                // Only the first exception to one thread can be stored
+                this.exception = e;
                 LOG.error("Error when running task", e);
             }
+            exceptionHandle(e);
         } finally {
             this.done();
             this.latch.countDown();
@@ -109,11 +131,7 @@ public final class Consumers<V> {
 
     private void run() {
         LOG.debug("Start to work...");
-        while (!this.ending) {
-            this.consume();
-        }
-        assert this.ending;
-        while (this.consume()){
+        while (this.consume()) {
             // ignore
         }
 
@@ -121,14 +139,18 @@ public final class Consumers<V> {
     }
 
     private boolean consume() {
-        V elem;
-        try {
-            elem = this.queue.poll(CONSUMER_WAKE_PERIOD, 
TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            // ignore
-            return true;
+        V elem = null;
+        while (elem == null) {
+            try {
+                elem = this.queue.poll(CONSUMER_WAKE_PERIOD, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                // ignore
+                return false;
+            }
         }
-        if (elem == null) {
+
+        if (elem == QUEUE_END) {
+            putQueueEnd();
             return false;
         }
         // do job
@@ -136,13 +158,29 @@ public final class Consumers<V> {
         return true;
     }
 
+    private void exceptionHandle(Throwable e) {
+        if (this.exceptionHandle == null) {
+            return;
+        }
+
+        try {
+            this.exceptionHandle.accept(e);
+        } catch (Throwable ex) {
+            if (this.exception == null) {
+                this.exception = ex;
+            } else {
+                LOG.warn("Error while calling exceptionHandle()", ex);
+            }
+        }
+    }
+
     private void done() {
-        if (this.done == null) {
+        if (this.doneHandle == null) {
             return;
         }
 
         try {
-            this.done.run();
+            this.doneHandle.run();
         } catch (Throwable e) {
             if (this.exception == null) {
                 this.exception = e;
@@ -169,6 +207,16 @@ public final class Consumers<V> {
         } else {
             try {
                 this.queue.put(v);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupt while queuing QUEUE_END", e);
+            }
+        }
+    }
+
+    private void putQueueEnd() {
+        if (this.executor != null) {
+            try {
+                this.queue.put(QUEUE_END);
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while enqueue", e);
             }
@@ -176,15 +224,18 @@ public final class Consumers<V> {
     }
 
     public void await() throws Throwable {
-        this.ending = true;
         if (this.executor == null) {
             // call done() directly if without thread pool
             this.done();
         } else {
             try {
+                putQueueEnd();
                 this.latch.await();
             } catch (InterruptedException e) {
                 String error = "Interrupted while waiting for consumers";
+                for (Future f : this.runningFutures) {
+                    f.cancel(true);
+                }
                 this.exception = new HugeException(error, e);
                 LOG.warn(error, e);
             }
@@ -201,7 +252,8 @@ public final class Consumers<V> {
 
     public static void executeOncePerThread(ExecutorService executor,
                                             int totalThreads,
-                                            Runnable callback)
+                                            Runnable callback,
+                                            long invokeTimeout)
                                             throws InterruptedException {
         // Ensure callback execute at least once for every thread
         final Map<Thread, Integer> threadsTimes = new ConcurrentHashMap<>();
@@ -230,7 +282,7 @@ public final class Consumers<V> {
         for (int i = 0; i < totalThreads; i++) {
             tasks.add(task);
         }
-        executor.invokeAll(tasks);
+        executor.invokeAll(tasks, invokeTimeout, TimeUnit.SECONDS);
     }
 
     public static ExecutorService newThreadPool(String prefix, int workers) {
@@ -290,13 +342,21 @@ public final class Consumers<V> {
         public synchronized void returnExecutor(ExecutorService executor) {
             E.checkNotNull(executor, "executor");
             if (!this.executors.offer(executor)) {
-                executor.shutdown();
+                try {
+                    executor.shutdown();
+                } catch (Exception e) {
+                    LOG.warn("close ExecutorService with error:", e);
+                }
             }
         }
 
         public synchronized void destroy() {
             for (ExecutorService executor : this.executors) {
-                executor.shutdown();
+                try {
+                    executor.shutdownNow();
+                } catch (Exception e) {
+                    LOG.warn("close ExecutorService with error:", e);
+                }
             }
             this.executors.clear();
         }
diff --git 
a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
 
b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
index 1d0cdba7b..ca1058b9a 100644
--- 
a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
+++ 
b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
@@ -44,9 +44,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
-import org.rocksdb.RocksDBException;
-import org.slf4j.Logger;
-
 import org.apache.hugegraph.HugeException;
 import org.apache.hugegraph.backend.BackendException;
 import org.apache.hugegraph.backend.id.Id;
@@ -69,6 +66,9 @@ import org.apache.hugegraph.util.E;
 import org.apache.hugegraph.util.ExecutorUtil;
 import org.apache.hugegraph.util.InsertionOrderUtil;
 import org.apache.hugegraph.util.Log;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+
 import com.google.common.collect.ImmutableList;
 
 public abstract class RocksDBStore extends 
AbstractBackendStore<RocksDBSessions.Session> {
@@ -93,7 +93,8 @@ public abstract class RocksDBStore extends 
AbstractBackendStore<RocksDBSessions.
 
     private static final String TABLE_GENERAL_KEY = "general";
     private static final String DB_OPEN = "db-open-%s";
-    private static final long OPEN_TIMEOUT = 600L;
+    private static final long DB_OPEN_TIMEOUT = 600L; // unit s
+    private static final long DB_CLOSE_TIMEOUT = 30L; // unit s
     /*
      * This is threads number used to concurrently opening RocksDB dbs,
      * 8 is supposed enough due to configurable data disks and
@@ -279,7 +280,7 @@ public abstract class RocksDBStore extends 
AbstractBackendStore<RocksDBSessions.
         this.useSessions();
         try {
             Consumers.executeOncePerThread(openPool, OPEN_POOL_THREADS,
-                                           this::closeSessions);
+                                           this::closeSessions, 
DB_CLOSE_TIMEOUT);
         } catch (InterruptedException e) {
             throw new BackendException("Failed to close session opened by " +
                                        "open-pool");
@@ -288,7 +289,7 @@ public abstract class RocksDBStore extends 
AbstractBackendStore<RocksDBSessions.
         boolean terminated;
         openPool.shutdown();
         try {
-            terminated = openPool.awaitTermination(OPEN_TIMEOUT,
+            terminated = openPool.awaitTermination(DB_OPEN_TIMEOUT,
                                                    TimeUnit.SECONDS);
         } catch (Throwable e) {
             throw new BackendException(


Reply via email to