This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 4fb4b90 Made is so that Beam can return traversers while its pipeline
is running.
4fb4b90 is described below
commit 4fb4b90c910715ae1e4825af8bec1ff5606e5c0a
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 26 18:50:51 2019 -0600
Made is so that Beam can return traversers while its pipeline is running.
---
.../apache/tinkerpop/machine/processor/beam/Beam.java | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index fd0bf65..8cb260a 100644
---
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.machine.processor.beam;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
@@ -44,8 +45,10 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
private boolean createTraverserServer;
private final int traverserServerPort;
private final Pipeline pipeline;
+ private PipelineResult pipelineResult;
private Iterator<Traverser<C, E>> iterator = null;
+
public Beam(final Compilation<C, S, E> compilation, final String
traverserServerLocation, final int traverserServerPort, final boolean
createTraverserServer) {
this.traverserServerPort = traverserServerPort;
this.createTraverserServer = createTraverserServer;
@@ -89,12 +92,15 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
private void setupPipeline() {
if (null == this.iterator) {
- this.iterator = this.createTraverserServer ?
- new TraverserServer<>(this.traverserServerPort) :
- Collections.emptyIterator();
- this.pipeline.run().waitUntilFinish();
- if (this.iterator instanceof TraverserServer)
- ((TraverserServer<C, E>) this.iterator).close();
+ if (this.createTraverserServer) {
+ this.iterator = new
TraverserServer<>(this.traverserServerPort);
+ this.pipelineResult = this.pipeline.run();
+ } else {
+ this.iterator = Collections.emptyIterator();
+ this.pipeline.run().waitUntilFinish();
+ }
}
+ if (this.createTraverserServer &&
this.pipelineResult.getState().isTerminal())
+ ((TraverserServer) this.iterator).close();
}
}