javeme commented on code in PR #2312:
URL: 
https://github.com/apache/incubator-hugegraph/pull/2312#discussion_r1327192514


##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -129,6 +141,64 @@ protected <K> long traverse(Iterator<K> iterator, 
Consumer<K> consumer,
         return total;
     }
 
+    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
+                                     Consumer<Iterator<K>> consumer,
+                                     String name, int queueWorkerSize) {
+        if (!iterator.hasNext()) {
+            return 0L;
+        }
+        AtomicBoolean done = new AtomicBoolean(false);
+        Consumers<Iterator<K>> consumers = null;
+        try {
+            consumers = getConsumers(consumer, queueWorkerSize, done,
+                                     executors.getExecutor());
+            return consumersStart(iterator, name, done, consumers);
+        } finally {
+            assert consumers != null;
+            executors.returnExecutor(consumers.executor());
+        }
+    }
+
+    private <K> long consumersStart(Iterator<Iterator<K>> iterator, String 
name,
+                                    AtomicBoolean done,
+                                    Consumers<Iterator<K>> consumers) {
+        long total = 0L;
+        try {
+            consumers.start(name);
+            while (iterator.hasNext() && !done.get()) {
+                total++;
+                Iterator<K> v = iterator.next();
+                consumers.provide(v);
+            }
+        } catch (Consumers.StopExecution e) {
+            // pass
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            try {
+                consumers.await();
+            } catch (Throwable e) {
+                throw Consumers.wrapException(e);
+            } finally {
+                CloseableIterator.closeIterator(iterator);
+            }
+        }
+        return total;
+    }
+
+    private <K> Consumers<Iterator<K>> getConsumers(Consumer<Iterator<K>> 
consumer,

Review Comment:
   prefer buildConsumers



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -129,6 +141,64 @@ protected <K> long traverse(Iterator<K> iterator, 
Consumer<K> consumer,
         return total;
     }
 
+    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
+                                     Consumer<Iterator<K>> consumer,
+                                     String name, int queueWorkerSize) {
+        if (!iterator.hasNext()) {
+            return 0L;
+        }
+        AtomicBoolean done = new AtomicBoolean(false);
+        Consumers<Iterator<K>> consumers = null;
+        try {
+            consumers = getConsumers(consumer, queueWorkerSize, done,
+                                     executors.getExecutor());
+            return consumersStart(iterator, name, done, consumers);
+        } finally {
+            assert consumers != null;
+            executors.returnExecutor(consumers.executor());
+        }
+    }
+
+    private <K> long consumersStart(Iterator<Iterator<K>> iterator, String 
name,
+                                    AtomicBoolean done,
+                                    Consumers<Iterator<K>> consumers) {
+        long total = 0L;
+        try {
+            consumers.start(name);
+            while (iterator.hasNext() && !done.get()) {
+                total++;
+                Iterator<K> v = iterator.next();
+                consumers.provide(v);
+            }
+        } catch (Consumers.StopExecution e) {
+            // pass
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            try {
+                consumers.await();
+            } catch (Throwable e) {
+                throw Consumers.wrapException(e);
+            } finally {
+                CloseableIterator.closeIterator(iterator);
+            }
+        }
+        return total;
+    }
+
+    private <K> Consumers<Iterator<K>> getConsumers(Consumer<Iterator<K>> 
consumer,
+                                                    int queueWorkerSize,
+                                                    AtomicBoolean done,
+                                                    ExecutorService executor) {
+        Consumers<Iterator<K>> consumers;
+        consumers = new Consumers<>(executor,

Review Comment:
   can move to the previous line



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -129,6 +141,64 @@ protected <K> long traverse(Iterator<K> iterator, 
Consumer<K> consumer,
         return total;
     }
 
+    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
+                                     Consumer<Iterator<K>> consumer,
+                                     String name, int queueWorkerSize) {
+        if (!iterator.hasNext()) {
+            return 0L;
+        }
+        AtomicBoolean done = new AtomicBoolean(false);
+        Consumers<Iterator<K>> consumers = null;
+        try {
+            consumers = getConsumers(consumer, queueWorkerSize, done,
+                                     executors.getExecutor());
+            return consumersStart(iterator, name, done, consumers);
+        } finally {
+            assert consumers != null;
+            executors.returnExecutor(consumers.executor());
+        }
+    }
+
+    private <K> long consumersStart(Iterator<Iterator<K>> iterator, String 
name,

Review Comment:
   prefer `sources, taskName`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{

Review Comment:
   expect a blank line after class define



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java:
##########
@@ -45,6 +47,17 @@ public int size() {
     @Override
     public List<Id> ids(long limit) {
         List<Id> ids = CollectionFactory.newList(CollectionType.EC);
+        this.getRecords(limit, ids);
+        return ids;
+    }
+
+    public Set<Id> idSet(long limit) {

Review Comment:
   prefer idsBySet



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);
+    }
+
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Steps steps,
+                            long capacity,
+                            Consumer<Edge> parseConsumer) {
+        CapacityConsumerWithStep consumer =
+                new CapacityConsumerWithStep(parseConsumer, capacity, steps);
+
+        EdgesQueryIterator queryIterator =
+                new EdgesQueryIterator(vertices, steps.direction(), 
steps.edgeLabels(),
+                                       steps.degree());
+
+        // 这里获取边数据,以便支持 step
+        EdgesIterator edgeIts = new EdgesIterator(queryIterator);
+
+        // 并行乱序处理

Review Comment:
   ditto



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -202,7 +252,7 @@ public ExecutorService executor() {
     public static void executeOncePerThread(ExecutorService executor,
                                             int totalThreads,
                                             Runnable callback)
-                                            throws InterruptedException {
+            throws InterruptedException {

Review Comment:
   keep the origin style?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -49,13 +49,13 @@ public final class Consumers<V> {
     private final ExecutorService executor;
     private final Consumer<V> consumer;
     private final Runnable done;
-
+    private final Consumer<Throwable> exceptionHandle;
     private final int workers;
+    private final List<Future> runnings;
     private final int queueSize;
     private final CountDownLatch latch;
-    private final BlockingQueue<V> queue;
-
-    private volatile boolean ending = false;
+    private final BlockingQueue<VWrapper<V>> queue;
+    private final VWrapper<V> queueEnd = new VWrapper(null);

Review Comment:
   we can just define a mark like `private final static Object QUEUE_END = new 
Object();`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -129,6 +141,64 @@ protected <K> long traverse(Iterator<K> iterator, 
Consumer<K> consumer,
         return total;
     }
 
+    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
+                                     Consumer<Iterator<K>> consumer,
+                                     String name, int queueWorkerSize) {
+        if (!iterator.hasNext()) {
+            return 0L;
+        }
+        AtomicBoolean done = new AtomicBoolean(false);
+        Consumers<Iterator<K>> consumers = null;
+        try {
+            consumers = getConsumers(consumer, queueWorkerSize, done,
+                                     executors.getExecutor());
+            return consumersStart(iterator, name, done, consumers);

Review Comment:
   prefer startConsumers



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -129,6 +141,64 @@ protected <K> long traverse(Iterator<K> iterator, 
Consumer<K> consumer,
         return total;
     }
 
+    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,

Review Comment:
   prefer `Iterator<Iterator<K>> sources`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);

Review Comment:
   prefer edgeIter



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {

Review Comment:
   keep consumer name?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -129,6 +141,64 @@ protected <K> long traverse(Iterator<K> iterator, 
Consumer<K> consumer,
         return total;
     }
 
+    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
+                                     Consumer<Iterator<K>> consumer,
+                                     String name, int queueWorkerSize) {

Review Comment:
   prefer `String taskName, int concurrentWorkers`.
   and also rename `taskName` in `traverse()` method.
   and also rename traverseBatch() to `traverseByBatch()`, rename traverse() to 
`traverseByOne()`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);
+    }
+
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Steps steps,
+                            long capacity,
+                            Consumer<Edge> parseConsumer) {
+        CapacityConsumerWithStep consumer =
+                new CapacityConsumerWithStep(parseConsumer, capacity, steps);
+
+        EdgesQueryIterator queryIterator =
+                new EdgesQueryIterator(vertices, steps.direction(), 
steps.edgeLabels(),
+                                       steps.degree());
+
+        // 这里获取边数据,以便支持 step
+        EdgesIterator edgeIts = new EdgesIterator(queryIterator);

Review Comment:
   edgeIter



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {
+        private final Consumer<E> parseConsumer;
+        private final long capacity;
+
+        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {
+            this.parseConsumer = parseConsumer;
+            this.capacity = capacity;
+        }
+
+        protected abstract Iterator<E> prepare(Iterator<T> it);

Review Comment:
   keep iter style



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {

Review Comment:
   EdgesConsumer



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {
+        private final Consumer<E> parseConsumer;
+        private final long capacity;
+
+        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {
+            this.parseConsumer = parseConsumer;
+            this.capacity = capacity;
+        }
+
+        protected abstract Iterator<E> prepare(Iterator<T> it);
+
+        @Override
+        public void accept(Iterator<T> edges) {
+            Iterator<E> ids = prepare(edges);
+            long counter = 0;
+            while (ids.hasNext()) {
+                if (Thread.currentThread().isInterrupted()) {
+                    LOG.warn("Consumer is Interrupted");
+                    break;
+                }
+                counter++;
+                parseConsumer.accept(ids.next());
+            }
+            long total = edgeIterCounter.addAndGet(counter);
+            // 按批次检测 capacity,以提高性能

Review Comment:
   ditto



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {
+        private final Consumer<E> parseConsumer;
+        private final long capacity;
+
+        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {
+            this.parseConsumer = parseConsumer;
+            this.capacity = capacity;
+        }
+
+        protected abstract Iterator<E> prepare(Iterator<T> it);
+
+        @Override
+        public void accept(Iterator<T> edges) {
+            Iterator<E> ids = prepare(edges);
+            long counter = 0;
+            while (ids.hasNext()) {
+                if (Thread.currentThread().isInterrupted()) {
+                    LOG.warn("Consumer is Interrupted");
+                    break;
+                }
+                counter++;
+                parseConsumer.accept(ids.next());
+            }
+            long total = edgeIterCounter.addAndGet(counter);
+            // 按批次检测 capacity,以提高性能
+            if (this.capacity != NO_LIMIT && total >= capacity) {
+                throw new Consumers.StopExecution("reach capacity");
+            }
+        }
+    }
+
+    public class CapacityConsumer extends EdgeItConsumer<Edge, EdgeId> {
+        public CapacityConsumer(Consumer<EdgeId> parseConsumer, long capacity) 
{
+            super(parseConsumer, capacity);
+        }
+
+        @Override
+        protected Iterator<EdgeId> prepare(Iterator<Edge> edges) {
+            return new MapperIterator<>(edges, (e) -> ((HugeEdge) e).id());
+        }
+    }
+
+    public class CapacityConsumerWithStep extends EdgeItConsumer<Edge, Edge> {
+        private final Steps steps;
+
+        public CapacityConsumerWithStep(Consumer<Edge> parseConsumer, long 
capacity,
+                                        Steps steps) {
+            super(parseConsumer, capacity);
+            this.steps = steps;
+        }
+
+        @Override
+        protected Iterator<Edge> prepare(Iterator<Edge> edges) {
+            return edgesOfVertexStep(edges, steps);
+        }
+    }
+

Review Comment:
   remove the blank line



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);
+    }
+
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Steps steps,
+                            long capacity,
+                            Consumer<Edge> parseConsumer) {
+        CapacityConsumerWithStep consumer =
+                new CapacityConsumerWithStep(parseConsumer, capacity, steps);
+
+        EdgesQueryIterator queryIterator =
+                new EdgesQueryIterator(vertices, steps.direction(), 
steps.edgeLabels(),
+                                       steps.degree());
+
+        // 这里获取边数据,以便支持 step
+        EdgesIterator edgeIts = new EdgesIterator(queryIterator);
+
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);
+    }

Review Comment:
   move to line 143?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java:
##########
@@ -0,0 +1,64 @@
+/*

Review Comment:
   can we refine the title like `feat(core): support batch+parallel edges 
traverse`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -49,13 +49,13 @@ public final class Consumers<V> {
     private final ExecutorService executor;
     private final Consumer<V> consumer;
     private final Runnable done;
-
+    private final Consumer<Throwable> exceptionHandle;
     private final int workers;
+    private final List<Future> runnings;

Review Comment:
   prefer runningFutures



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -49,13 +49,13 @@ public final class Consumers<V> {
     private final ExecutorService executor;
     private final Consumer<V> consumer;
     private final Runnable done;
-
+    private final Consumer<Throwable> exceptionHandle;

Review Comment:
   also rename `Runnable done` to `Runnable doneHandle`?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -168,23 +205,36 @@ public void provide(V v) throws Throwable {
             throw this.throwException();
         } else {
             try {
-                this.queue.put(v);
+                this.queue.put(new VWrapper<>(v));
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while enqueue", e);
+            }
+        }
+    }
+
+    private void putEnd() {

Review Comment:
   prefer putQueueEnd



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理

Review Comment:
   can we translate it



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -64,42 +64,63 @@ public Consumers(ExecutorService executor, Consumer<V> 
consumer) {
 
     public Consumers(ExecutorService executor,
                      Consumer<V> consumer, Runnable done) {
+        this(executor, consumer, done, QUEUE_WORKER_SIZE);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer,
+                     Runnable done,
+                     int queueWorkerSize) {
+        this(executor, consumer, done, null, queueWorkerSize);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer,
+                     Runnable done,
+                     Consumer<Throwable> handle,
+                     int queueWorkerSize) {

Review Comment:
   rename to queueSizePerWorker?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -230,7 +280,7 @@ public static void executeOncePerThread(ExecutorService 
executor,
         for (int i = 0; i < totalThreads; i++) {
             tasks.add(task);
         }
-        executor.invokeAll(tasks);
+        executor.invokeAll(tasks, 5, TimeUnit.SECONDS);

Review Comment:
   add a timeout arg?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -64,42 +64,63 @@ public Consumers(ExecutorService executor, Consumer<V> 
consumer) {
 
     public Consumers(ExecutorService executor,
                      Consumer<V> consumer, Runnable done) {
+        this(executor, consumer, done, QUEUE_WORKER_SIZE);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer,
+                     Runnable done,
+                     int queueWorkerSize) {
+        this(executor, consumer, done, null, queueWorkerSize);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer,
+                     Runnable done,
+                     Consumer<Throwable> handle,

Review Comment:
   improve `handle` var name



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,

Review Comment:
   also keep traverseIdsByBfs style?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);
+    }
+
+    protected void bfsQuery(Iterator<Id> vertices,

Review Comment:
   also keep traverseIdsByBfs style?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);
+    }
+
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Steps steps,
+                            long capacity,
+                            Consumer<Edge> parseConsumer) {
+        CapacityConsumerWithStep consumer =
+                new CapacityConsumerWithStep(parseConsumer, capacity, steps);
+
+        EdgesQueryIterator queryIterator =
+                new EdgesQueryIterator(vertices, steps.direction(), 
steps.edgeLabels(),
+                                       steps.degree());

Review Comment:
   try this style:
   ```java
   EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices, 
steps.direction(), 
                                                             
steps.edgeLabels(), steps.degree());
   ```



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -144,6 +214,39 @@ protected boolean match(Element elem, String key, Object 
value) {
         return p.isPresent() && Objects.equal(p.value(), value);
     }
 
+    protected void bfsQuery(Iterator<Id> vertices,
+                            Directions dir,
+                            Id label,
+                            long degree,
+                            long capacity,
+                            Consumer<EdgeId> parseConsumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : 
Collections.singletonList(label);
+        CapacityConsumer consumer = new CapacityConsumer(parseConsumer, 
capacity);
+
+        EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree);
+        // 并行乱序处理
+        this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1);

Review Comment:
   iter



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {
+        private final Consumer<E> parseConsumer;
+        private final long capacity;
+
+        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {
+            this.parseConsumer = parseConsumer;
+            this.capacity = capacity;
+        }
+
+        protected abstract Iterator<E> prepare(Iterator<T> it);
+
+        @Override
+        public void accept(Iterator<T> edges) {
+            Iterator<E> ids = prepare(edges);
+            long counter = 0;
+            while (ids.hasNext()) {
+                if (Thread.currentThread().isInterrupted()) {
+                    LOG.warn("Consumer is Interrupted");
+                    break;
+                }
+                counter++;
+                parseConsumer.accept(ids.next());
+            }
+            long total = edgeIterCounter.addAndGet(counter);
+            // 按批次检测 capacity,以提高性能
+            if (this.capacity != NO_LIMIT && total >= capacity) {
+                throw new Consumers.StopExecution("reach capacity");
+            }
+        }
+    }
+
+    public class CapacityConsumer extends EdgeItConsumer<Edge, EdgeId> {
+        public CapacityConsumer(Consumer<EdgeId> parseConsumer, long capacity) 
{
+            super(parseConsumer, capacity);
+        }
+
+        @Override
+        protected Iterator<EdgeId> prepare(Iterator<Edge> edges) {
+            return new MapperIterator<>(edges, (e) -> ((HugeEdge) e).id());
+        }
+    }
+
+    public class CapacityConsumerWithStep extends EdgeItConsumer<Edge, Edge> {

Review Comment:
   prefer use OneStepEdgeIterConsumer/StepsEdgeIterConsumer instead of 
CapacityConsumer/CapacityConsumerWithStep



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {
+        private final Consumer<E> parseConsumer;
+        private final long capacity;
+
+        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {

Review Comment:
   keep consumer name?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,100 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> 
{
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long 
limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {
+                return;
+            }
+
+            if (excluded != null && excluded.contains(targetV)) {
+                return;
+            }
+
+            if (neighbors.add(targetV)) {
+                if (limit != NO_LIMIT) {
+                    count.getAndIncrement();
+                }
+            }
+        }
+    }
+
+    public abstract class EdgeItConsumer<T, E> implements 
Consumer<Iterator<T>> {
+        private final Consumer<E> parseConsumer;

Review Comment:
   keep consumer name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to