no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator<Vertex> partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3dd1f6e5 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3dd1f6e5 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3dd1f6e5 Branch: refs/heads/tp32 Commit: 3dd1f6e5e141d715e678c66acd0516806e625047 Parents: 27a4b27 Author: Marko A. Rodriguez <[email protected]> Authored: Wed Jan 4 12:39:35 2017 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Thu Jan 5 17:00:17 2017 -0700 ---------------------------------------------------------------------- .../process/computer/TinkerGraphComputer.java | 14 +++----- .../process/computer/TinkerWorkerPool.java | 36 +++++++++++++++++--- 2 files changed, 36 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java index 7523d63..2abce9a 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java @@ -156,24 +156,21 @@ public final class TinkerGraphComputer implements GraphComputer { this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers); return computerService.submit(() -> { final long time = System.currentTimeMillis(); - final TinkerGraphComputerView view; - final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers); + final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() : Collections.emptySet()); + final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory, this.workers); try { if (null != this.vertexProgram) { - view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys()); // execute the vertex program this.vertexProgram.setup(this.memory); while (true) { if (Thread.interrupted()) throw new TraversalInterruptedException(); this.memory.completeSubRound(); workers.setVertexProgram(this.vertexProgram); - final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices()); - workers.executeVertexProgram((vertexProgram, workerMemory) -> { + workers.executeVertexProgram((vertices, vertexProgram, workerMemory) -> { vertexProgram.workerIterationStart(workerMemory.asImmutable()); - while (true) { + while (vertices.hasNext()) { final Vertex vertex = vertices.next(); if (Thread.interrupted()) throw new TraversalInterruptedException(); - if (null == vertex) break; vertexProgram.execute( ComputerGraph.vertexProgram(vertex, vertexProgram), new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), @@ -192,9 +189,6 @@ public final class TinkerGraphComputer implements GraphComputer { } } view.complete(); // drop all transient vertex compute keys - } else { - // MapReduce only - view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet()); } // execute mapreduce jobs http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java index 140d347..637b416 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java @@ -23,14 +23,20 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool; import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper; +import org.apache.tinkerpop.gremlin.util.function.TriConsumer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -48,13 +54,33 @@ public final class TinkerWorkerPool implements AutoCloseable { private VertexProgramPool vertexProgramPool; private MapReducePool mapReducePool; private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>(); + private final List<List<Vertex>> workerVertices = new ArrayList<>(); - public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) { + public TinkerWorkerPool(final TinkerGraph graph, final TinkerMemory memory, final int numberOfWorkers) { this.numberOfWorkers = numberOfWorkers; this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER); this.completionService = new ExecutorCompletionService<>(this.workerPool); for (int i = 0; i < this.numberOfWorkers; i++) { this.workerMemoryPool.add(new TinkerWorkerMemory(memory)); + this.workerVertices.add(new ArrayList<>()); + } + int batchSize = TinkerHelper.getVertices(graph).size() / this.numberOfWorkers; + if (0 == batchSize) + batchSize = 1; + int counter = 0; + int index = 0; + + List<Vertex> currentWorkerVertices = this.workerVertices.get(index); + final Iterator<Vertex> iterator = graph.vertices(); + while (iterator.hasNext()) { + final Vertex vertex = iterator.next(); + if (counter++ < batchSize || index == this.workerVertices.size() - 1) { + currentWorkerVertices.add(vertex); + } else { + currentWorkerVertices = this.workerVertices.get(++index); + currentWorkerVertices.add(vertex); + counter = 1; + } } } @@ -66,12 +92,14 @@ public final class TinkerWorkerPool implements AutoCloseable { this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers); } - public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException { + public void executeVertexProgram(final TriConsumer<Iterator<Vertex>, VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException { for (int i = 0; i < this.numberOfWorkers; i++) { + final int index = i; this.completionService.submit(() -> { final VertexProgram vp = this.vertexProgramPool.take(); final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll(); - worker.accept(vp, workerMemory); + final List<Vertex> vertices = this.workerVertices.get(index); + worker.accept(vertices.iterator(), vp, workerMemory); this.vertexProgramPool.offer(vp); this.workerMemoryPool.offer(workerMemory); return null;
