This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new da887c2  big moves. MachineServer, RemoteMachine, TraverserServer. We 
now have general support for over the network traverser iterators.
da887c2 is described below

commit da887c23c1f18617ff77ede74d69d93a4895fb83
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 26 11:45:03 2019 -0600

    big moves. MachineServer, RemoteMachine, TraverserServer. We now have 
general support for over the network traverser iterators.
---
 .../language/gremlin/TraversalSource.java          |   8 ++
 .../java/org/apache/tinkerpop/machine/Machine.java |   3 +-
 .../tinkerpop/machine/bytecode/Bytecode.java       |   3 +-
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |   5 +-
 .../tinkerpop/machine/bytecode/Instruction.java    |   3 +-
 .../machine/bytecode/SourceInstruction.java        |   3 +-
 .../tinkerpop/machine/function/map/PathMap.java    |   2 +-
 .../tinkerpop/machine/species/BasicMachine.java    |   6 ++
 .../tinkerpop/machine/species/LocalMachine.java    |   6 ++
 .../tinkerpop/machine/species/RemoteMachine.java   |  52 -----------
 .../MachineServer.java}                            |  73 ++++++++-------
 .../machine/species/remote/RemoteMachine.java      | 100 +++++++++++++++++++++
 .../tinkerpop/machine/species/remote/Request.java  |  57 ++++++++++++
 .../species/{io => remote}/TraverserServer.java    |  17 ++--
 .../tinkerpop/machine/structure/data/JTuple2.java  |   4 +-
 .../tinkerpop/machine/structure/data/TMap.java     |   3 +-
 .../tinkerpop/machine/processor/beam/Beam.java     |  29 +++---
 .../machine/processor/beam/BeamProcessor.java      |  28 +++++-
 .../tinkerpop/machine/processor/beam/OutputFn.java |  26 +++---
 .../tinkerpop/machine/processor/beam/BeamTest.java |  14 ++-
 .../machine/processor/pipes/PipesTest.java         |  12 ++-
 21 files changed, 310 insertions(+), 144 deletions(-)

diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
index fa2d29f..e27f680 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
@@ -33,6 +33,8 @@ import 
org.apache.tinkerpop.machine.strategy.verification.CoefficientVerificatio
 import org.apache.tinkerpop.machine.structure.StructureFactory;
 import org.apache.tinkerpop.machine.structure.data.TVertex;
 
+import java.util.Map;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -64,6 +66,12 @@ public class TraversalSource<C> implements Cloneable {
         return clone;
     }
 
+    public TraversalSource<C> withProcessor(final Class<? extends 
ProcessorFactory> processor, final Map<String, Object> configuration) {
+        final TraversalSource<C> clone = this.clone();
+        clone.bytecode.addSourceInstruction(Symbols.WITH_PROCESSOR, processor, 
configuration);
+        return clone;
+    }
+
     public TraversalSource<C> withStructure(final Class<? extends 
StructureFactory> structure) {
         final TraversalSource<C> clone = this.clone();
         clone.bytecode.addSourceInstruction(Symbols.WITH_STRUCTURE, structure);
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
index a4f84e7..fb9074f 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
@@ -21,12 +21,13 @@ package org.apache.tinkerpop.machine;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.io.Closeable;
 import java.util.Iterator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Machine {
+public interface Machine extends Closeable {
 
     public <C> Bytecode<C> register(final Bytecode<C> sourceCode);
 
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
index bcc1dab..ba77f99 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
@@ -20,13 +20,14 @@ package org.apache.tinkerpop.machine.bytecode;
 
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class Bytecode<C> implements Cloneable { // todo: serializable?
+public final class Bytecode<C> implements Cloneable, Serializable {
 
     private List<SourceInstruction> sourceInstructions = new ArrayList<>();
     private List<Instruction<C>> instructions = new ArrayList<>();
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 07d4860..aa6bca6 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -34,6 +34,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -108,7 +109,9 @@ public final class BytecodeUtil {
             ProcessorFactory processor = null;
             for (final SourceInstruction sourceInstruction : 
bytecode.getSourceInstructions()) {
                 if (sourceInstruction.op().equals(Symbols.WITH_PROCESSOR)) {
-                    processor = ((Class<? extends ProcessorFactory>) 
sourceInstruction.args()[0]).getConstructor().newInstance();
+                    processor = 1 == sourceInstruction.args().length ?
+                            ((Class<? extends ProcessorFactory>) 
sourceInstruction.args()[0]).getConstructor().newInstance() :
+                            ((Class<? extends ProcessorFactory>) 
sourceInstruction.args()[0]).getConstructor(Map.class).newInstance((Map) 
sourceInstruction.args()[1]);
                 }
             }
             return Optional.ofNullable(processor);
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
index 6df4a20..48088b8 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
@@ -21,12 +21,13 @@ package org.apache.tinkerpop.machine.bytecode;
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.util.StringFactory;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class Instruction<C> {
+public final class Instruction<C> implements Serializable {
 
     private final Coefficient<C> coefficient;
     private final String op;
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
index 6b484fc..2c54483 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
@@ -20,12 +20,13 @@ package org.apache.tinkerpop.machine.bytecode;
 
 import org.apache.tinkerpop.machine.util.StringFactory;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class SourceInstruction {
+public final class SourceInstruction implements Serializable {
 
     private final String op;
     private final Object[] args;
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
index ac06a21..75333a8 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
@@ -76,7 +76,7 @@ public final class PathMap<C, S> extends AbstractFunction<C> 
implements MapFunct
         final List<Compilation<C, Object, Object>> compilations = new 
ArrayList<>();
         boolean processingLabels = true;
         for (final Object arg : instruction.args()) {
-            if ("|" == arg) {
+            if (arg.equals("|")) {
                 processingLabels = false;
                 continue;
             }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
index 43b4946..2bf8b2c 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -52,4 +53,9 @@ public final class BasicMachine implements Machine {
     public static Machine open() {
         return new BasicMachine();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
index 042124c..9e192ee 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
@@ -26,6 +26,7 @@ import 
org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.bytecode.compiler.SourceCompilation;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
@@ -91,4 +92,9 @@ public final class LocalMachine implements Machine {
         }
         return Optional.empty();
     }
+
+    @Override
+    public void close() throws IOException {
+        this.sources.clear();
+    }
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java
deleted file mode 100644
index 00da1c6..0000000
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.machine.species;
-
-import org.apache.tinkerpop.machine.Machine;
-import org.apache.tinkerpop.machine.bytecode.Bytecode;
-import org.apache.tinkerpop.machine.traverser.Traverser;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class RemoteMachine implements Machine {
-
-    private final int port;
-
-    public RemoteMachine(final int port) {
-        this.port = port;
-    }
-
-    @Override
-    public <C> Bytecode<C> register(final Bytecode<C> sourceCode) {
-        return null;
-    }
-
-    @Override
-    public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode) 
{
-        return null;
-    }
-
-    @Override
-    public <C> void close(final Bytecode<C> sourceCode) {
-
-    }
-}
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
similarity index 57%
copy from 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
copy to 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
index a6b565b..8c9377a 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
@@ -16,14 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.species.io;
+package org.apache.tinkerpop.machine.species.remote;
 
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.species.LocalMachine;
 import org.apache.tinkerpop.machine.traverser.Traverser;
-import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
 
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.Iterator;
@@ -32,65 +36,45 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class TraverserServer<C, S> implements Runnable, 
Iterator<Traverser<C, S>> {
+public class MachineServer implements Runnable, Closeable {
 
-    private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
     private final int serverPort;
     private ServerSocket serverSocket;
-    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
+    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
+    private final Machine machine = LocalMachine.open();
 
-    public TraverserServer(final int serverPort) {
+    public MachineServer(final int serverPort) {
         this.serverPort = serverPort;
+        new Thread(this).start();
     }
 
     public void run() {
         try {
             this.serverSocket = new ServerSocket(this.serverPort);
-            this.serverAlive.set(Boolean.TRUE);
-            // System.out.println("Server started: " + 
this.serverSocket.toString());
             while (this.isAlive()) {
                 final Socket clientSocket = this.serverSocket.accept();
                 new Thread(new Worker(clientSocket)).start();
             }
-            // System.out.println("Server Stopped.");
         } catch (final Exception e) {
             if (this.serverAlive.get())
                 throw new RuntimeException(e.getMessage(), e);
         }
     }
 
-
     private boolean isAlive() {
         return this.serverAlive.get();
     }
 
-    public synchronized void stop() {
+    public synchronized void close() {
         try {
             this.serverAlive.set(Boolean.FALSE);
             this.serverSocket.close();
+            this.machine.close();
         } catch (final IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
     }
 
-    @Override
-    public boolean hasNext() {
-        if (!this.traverserSet.isEmpty())
-            return true;
-        else {
-            while (this.isAlive()) {
-                if (!this.traverserSet.isEmpty())
-                    return true;
-            }
-            return !this.traverserSet.isEmpty();
-        }
-    }
-
-    @Override
-    public Traverser<C, S> next() {
-        return this.traverserSet.remove();
-    }
-
     public class Worker implements Runnable {
 
         private final Socket clientSocket;
@@ -100,16 +84,31 @@ public final class TraverserServer<C, S> implements 
Runnable, Iterator<Traverser
         }
 
         public void run() {
-            //int counter = 0;
             try {
-                //System.out.println("Client connected: " + 
this.clientSocket.toString());
                 final ObjectInputStream input = new 
ObjectInputStream(this.clientSocket.getInputStream());
+                final ObjectOutputStream output = new 
ObjectOutputStream(this.clientSocket.getOutputStream());
                 while (true) {
-                    final Traverser<C, S> traverser = (Traverser<C, S>) 
input.readObject();
-                    //System.out.println("Received traverser [" + 
this.clientSocket.getPort() + "]: " + traverser);
-                    traverserSet.add(traverser);
+                    final Request<Object> request = (Request) 
input.readObject();
+                    if (Request.Type.register == request.type) {
+                        output.writeObject(machine.register(request.bytecode));
+                        output.flush();
+                    } else if (Request.Type.submit == request.type) {
+                        final Socket traverserServerSocket = new 
Socket(request.traverserServerLocation, request.traverserServerPort);
+                        final ObjectOutputStream traverserOutput = new 
ObjectOutputStream(traverserServerSocket.getOutputStream());
+                        final Iterator<Traverser<Object, Object>> iterator = 
machine.submit(request.bytecode);
+                        while (iterator.hasNext()) {
+                            traverserOutput.writeObject(iterator.next());
+                            traverserOutput.flush();
+                        }
+                        traverserOutput.writeObject(EmptyTraverser.instance());
+                        traverserOutput.flush();
+                        traverserOutput.close();
+                    } else if (Request.Type.close == request.type) {
+                        machine.close(request.bytecode);
+                    } else {
+                        throw new IllegalStateException("This shouldn't 
happen: " + request);
+                    }
                 }
-                //System.out.println(this.toString() + ": is complete..." + 
counter);
             } catch (final EOFException e) {
                 // okay -- this is how the worker closes
             } catch (final IOException | ClassNotFoundException e) {
@@ -117,6 +116,4 @@ public final class TraverserServer<C, S> implements 
Runnable, Iterator<Traverser
             }
         }
     }
-
-
-}
+}
\ No newline at end of file
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
new file mode 100644
index 0000000..8874b2b
--- /dev/null
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.species.remote;
+
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RemoteMachine implements Machine {
+
+    private final String localTraverserServerLocation;
+    private final int localTraverserServerPort;
+    private final Socket machineServer;
+    private final ObjectInputStream inputStream;
+    private final ObjectOutputStream outputStream;
+
+
+    private RemoteMachine(final int localTraverserServerPort, final String 
machineLocation, final int machinePort) {
+        try {
+            this.localTraverserServerLocation = 
InetAddress.getLocalHost().getHostAddress();
+            this.localTraverserServerPort = localTraverserServerPort;
+            this.machineServer = new Socket(machineLocation, machinePort);
+            this.outputStream = new 
ObjectOutputStream(machineServer.getOutputStream());
+            this.inputStream = new 
ObjectInputStream(machineServer.getInputStream());
+        } catch (final Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public <C> Bytecode<C> register(final Bytecode<C> sourceCode) {
+        try {
+            this.outputStream.writeObject(Request.register(sourceCode));
+            this.outputStream.flush();
+            return (Bytecode<C>) this.inputStream.readObject();
+        } catch (final IOException | ClassNotFoundException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode) 
{
+        try {
+            final TraverserServer<C, E> traverserServer = new 
TraverserServer<>(this.localTraverserServerPort);
+            new Thread(traverserServer).start();
+            this.outputStream.writeObject(Request.submit(bytecode, 
this.localTraverserServerLocation, this.localTraverserServerPort));
+            this.outputStream.flush();
+            return traverserServer;
+        } catch (final IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public <C> void close(final Bytecode<C> sourceCode) {
+        try {
+            this.outputStream.writeObject(Request.close(sourceCode));
+            this.outputStream.flush();
+        } catch (final IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public static Machine open(final int localTraverserServerPort, final 
String machineLocation, final int machinePort) {
+        return new RemoteMachine(localTraverserServerPort, machineLocation, 
machinePort);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.inputStream.close();
+        this.outputStream.close();
+        this.machineServer.close();
+    }
+}
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/Request.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/Request.java
new file mode 100644
index 0000000..27c1edc
--- /dev/null
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/Request.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.species.remote;
+
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class Request<C> implements Serializable {
+
+    public enum Type {
+        register, submit, close;
+    }
+
+    public final Type type;
+    public final Bytecode<C> bytecode;
+    public final String traverserServerLocation;
+    public final int traverserServerPort;
+
+    private Request(final Type type, final Bytecode<C> bytecode, final String 
traverserServerLocation, final int traverserServerPort) {
+        this.type = type;
+        this.bytecode = bytecode;
+        this.traverserServerLocation = traverserServerLocation;
+        this.traverserServerPort = traverserServerPort;
+    }
+
+    static <C> Request<C> register(final Bytecode<C> bytecode) {
+        return new Request<>(Type.register, bytecode, null, -1);
+    }
+
+    static <C> Request<C> submit(final Bytecode<C> bytecode, final String 
traverserServerLocation, final int traverserServerPort) {
+        return new Request<>(Type.submit, bytecode, traverserServerLocation, 
traverserServerPort);
+    }
+
+    static <C> Request<C> close(final Bytecode<C> bytecode) {
+        return new Request<>(Type.close, bytecode, null, -1);
+    }
+}
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
similarity index 86%
rename from 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
rename to 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
index a6b565b..4bb1683 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.species.io;
+package org.apache.tinkerpop.machine.species.remote;
 
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -37,7 +38,7 @@ public final class TraverserServer<C, S> implements Runnable, 
Iterator<Traverser
     private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
     private final int serverPort;
     private ServerSocket serverSocket;
-    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
+    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
 
     public TraverserServer(final int serverPort) {
         this.serverPort = serverPort;
@@ -46,20 +47,16 @@ public final class TraverserServer<C, S> implements 
Runnable, Iterator<Traverser
     public void run() {
         try {
             this.serverSocket = new ServerSocket(this.serverPort);
-            this.serverAlive.set(Boolean.TRUE);
-            // System.out.println("Server started: " + 
this.serverSocket.toString());
             while (this.isAlive()) {
                 final Socket clientSocket = this.serverSocket.accept();
                 new Thread(new Worker(clientSocket)).start();
             }
-            // System.out.println("Server Stopped.");
         } catch (final Exception e) {
             if (this.serverAlive.get())
                 throw new RuntimeException(e.getMessage(), e);
         }
     }
 
-
     private boolean isAlive() {
         return this.serverAlive.get();
     }
@@ -100,16 +97,16 @@ public final class TraverserServer<C, S> implements 
Runnable, Iterator<Traverser
         }
 
         public void run() {
-            //int counter = 0;
             try {
-                //System.out.println("Client connected: " + 
this.clientSocket.toString());
                 final ObjectInputStream input = new 
ObjectInputStream(this.clientSocket.getInputStream());
                 while (true) {
                     final Traverser<C, S> traverser = (Traverser<C, S>) 
input.readObject();
-                    //System.out.println("Received traverser [" + 
this.clientSocket.getPort() + "]: " + traverser);
+                    if (traverser instanceof EmptyTraverser) {
+                        stop();
+                        break;
+                    }
                     traverserSet.add(traverser);
                 }
-                //System.out.println(this.toString() + ": is complete..." + 
counter);
             } catch (final EOFException e) {
                 // okay -- this is how the worker closes
             } catch (final IOException | ClassNotFoundException e) {
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
index cf357b4..e8efb9c 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
@@ -18,10 +18,12 @@
  */
 package org.apache.tinkerpop.machine.structure.data;
 
+import java.io.Serializable;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class JTuple2<A, B> implements TTuple2<A, B> {
+public final class JTuple2<A, B> implements TTuple2<A, B>, Serializable {
 
     private final A a;
     private final B b;
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
index 70cbb52..0a4471b 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
@@ -18,12 +18,13 @@
  */
 package org.apache.tinkerpop.machine.structure.data;
 
+import java.io.Serializable;
 import java.util.Iterator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface TMap<K, V> {
+public interface TMap<K, V> extends Serializable {
 
     public void set(final K key, final V value);
 
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index b984670..6db2e6f 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -28,10 +28,11 @@ import org.apache.tinkerpop.machine.processor.Processor;
 import 
org.apache.tinkerpop.machine.processor.beam.serialization.TraverserCoder;
 import org.apache.tinkerpop.machine.processor.beam.util.ExecutionPlanner;
 import org.apache.tinkerpop.machine.processor.beam.util.TopologyUtil;
-import org.apache.tinkerpop.machine.species.io.TraverserServer;
+import org.apache.tinkerpop.machine.species.remote.TraverserServer;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
 
+import java.util.Collections;
 import java.util.Iterator;
 
 /**
@@ -39,20 +40,22 @@ import java.util.Iterator;
  */
 public class Beam<C, S, E> implements Processor<C, S, E> {
 
-    public static final int RESULT_SERVER_PORT = 6532; // TODO: this needs to 
be a dynamic configuration
     public static final int MAX_REPETIONS = 15; // TODO: this needs to be a 
dynamic configuration
-
+    private boolean createTraverserServer;
+    private final int traverserServerPort;
     private final Pipeline pipeline;
     private Iterator<Traverser<C, E>> iterator = null;
 
-    public Beam(final Compilation<C, S, E> compilation) {
+    public Beam(final Compilation<C, S, E> compilation, final String 
traverserServerLocation, final int traverserServerPort, final boolean 
createTraverserServer) {
+        this.traverserServerPort = traverserServerPort;
+        this.createTraverserServer = createTraverserServer;
+        ///
         this.pipeline = Pipeline.create();
         this.pipeline.getOptions().setRunner(new 
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
         final PCollection<Traverser<C, S>> source = 
this.pipeline.apply(Create.of(EmptyTraverser.instance()));
         source.setCoder(new TraverserCoder<>());
         final PCollection<Traverser<C, E>> sink = TopologyUtil.compile(source, 
compilation);
-        sink.apply(ParDo.of(new OutputFn<>("localhost", RESULT_SERVER_PORT)));
-
+        sink.apply(ParDo.of(new OutputFn<>(traverserServerLocation, 
this.traverserServerPort)));
     }
 
     @Override
@@ -84,14 +87,16 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
         return visitor.toString();
     }
 
-    private final void setupPipeline() {
+    private void setupPipeline() {
         if (null == this.iterator) {
-            final TraverserServer<C, E> server = new 
TraverserServer<>(Beam.RESULT_SERVER_PORT);
-            new Thread(server).start();
+            if (this.createTraverserServer) {
+                this.iterator = new 
TraverserServer<>(this.traverserServerPort);
+                new Thread((TraverserServer) this.iterator).start();
+            } else
+                this.iterator = Collections.emptyIterator();
             this.pipeline.run().waitUntilFinish();
-            server.stop();
-            this.iterator = server;
+            if (this.iterator instanceof TraverserServer)
+                ((TraverserServer<C, E>) this.iterator).stop();
         }
     }
-
 }
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
index 87d3481..53ed990 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
@@ -19,17 +19,19 @@
 package org.apache.tinkerpop.machine.processor.beam;
 
 import org.apache.tinkerpop.machine.bytecode.compiler.BytecodeCompiler;
-import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.bytecode.compiler.CommonCompiler;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.processor.beam.strategy.BeamStrategy;
 import org.apache.tinkerpop.machine.strategy.Strategy;
 
-import java.util.Arrays;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -38,10 +40,30 @@ import java.util.Set;
 public class BeamProcessor implements ProcessorFactory {
 
     private static final List<BytecodeCompiler> COMPILERS = 
List.of(CoreCompiler.instance(), CommonCompiler.instance());
+    private static final int DEFAULT_SERVER_PORT = 8888;
+    public static final String TRAVERSER_SERVER_LOCATION = 
"beam.traverserServer.location";
+    public static final String TRAVERSER_SERVER_PORT = 
"beam.traverserServer.port";
+
+    private final Map<String, Object> configuration;
+
+    public BeamProcessor() {
+        this.configuration = Collections.emptyMap();
+    }
+
+    public BeamProcessor(final Map<String, Object> configuration) {
+        this.configuration = configuration;
+    }
 
     @Override
     public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> 
compilation) {
-        return new Beam<>(compilation);
+        try {
+            return new Beam<>(compilation,
+                    (String) 
this.configuration.getOrDefault(TRAVERSER_SERVER_LOCATION, 
InetAddress.getLocalHost().getHostAddress()),
+                    (Integer) 
this.configuration.getOrDefault(TRAVERSER_SERVER_PORT, DEFAULT_SERVER_PORT),
+                    
!this.configuration.containsKey(TRAVERSER_SERVER_LOCATION));
+        } catch (final UnknownHostException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
     }
 
     @Override
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
index c270456..ee31a1a 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
@@ -30,14 +30,14 @@ import java.net.Socket;
  */
 public class OutputFn<C, S> extends DoFn<Traverser<C, S>, Void> {
 
-    private final String resultServerLocation;
-    private final int resultServerPort;
-    private Socket resultServer;
+    private final String traverserServerLocation;
+    private final int traverserServerPort;
+    private Socket traverserServerSocket;
     private ObjectOutputStream outputStream;
 
-    OutputFn(final String resultServerLocation, final int resultServerPort) {
-        this.resultServerLocation = resultServerLocation;
-        this.resultServerPort = resultServerPort;
+    OutputFn(final String traverserServerLocation, final int 
traverserServerPort) {
+        this.traverserServerLocation = traverserServerLocation;
+        this.traverserServerPort = traverserServerPort;
     }
 
     @ProcessElement
@@ -52,16 +52,13 @@ public class OutputFn<C, S> extends DoFn<Traverser<C, S>, 
Void> {
     @StartBundle
     public void startBundle() {
         // only create a connection if results are generated
-        if (null == this.resultServer) {
+        if (null == this.traverserServerSocket) {
             try {
-                //System.out.println("setting up client: " + this.toString());
-                this.resultServer = new Socket(this.resultServerLocation, 
this.resultServerPort);
-                this.outputStream = new 
ObjectOutputStream(this.resultServer.getOutputStream());
-                //System.out.println("Connected to server: " + 
this.resultServer.toString());
+                this.traverserServerSocket = new 
Socket(this.traverserServerLocation, this.traverserServerPort);
+                this.outputStream = new 
ObjectOutputStream(this.traverserServerSocket.getOutputStream());
             } catch (final Exception e) {
                 throw new RuntimeException(e.getMessage(), e);
             }
-            //System.out.println(this.toString() + " client setup");
         }
     }
 
@@ -76,11 +73,10 @@ public class OutputFn<C, S> extends DoFn<Traverser<C, S>, 
Void> {
 
     @Teardown
     public void stop() {
-        if (null != this.resultServer) {
+        if (null != this.traverserServerSocket) {
             try {
-                // System.out.println(this.toString() + " client stopping");
                 this.outputStream.flush();
-                this.resultServer.close();
+                this.traverserServerSocket.close();
             } catch (final Exception e) {
                 throw new RuntimeException(e.getMessage(), e);
             }
diff --git 
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
 
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
index ac28f62..5e00a82 100644
--- 
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
+++ 
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
@@ -24,14 +24,17 @@ import org.apache.tinkerpop.language.gremlin.Traversal;
 import org.apache.tinkerpop.language.gremlin.TraversalSource;
 import org.apache.tinkerpop.language.gremlin.TraversalUtil;
 import org.apache.tinkerpop.language.gremlin.core.__;
-import org.apache.tinkerpop.machine.species.LocalMachine;
 import org.apache.tinkerpop.machine.Machine;
 import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.species.remote.MachineServer;
+import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
 import org.apache.tinkerpop.machine.structure.blueprints.BlueprintsStructure;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.tinkerpop.language.gremlin.core.__.constant;
 import static org.apache.tinkerpop.language.gremlin.core.__.incr;
@@ -66,11 +69,12 @@ public class BeamTest {
     }
 
     @Test
-    public void shouldWork() {
-        final Machine machine = LocalMachine.open();
+    public void shouldWork() throws Exception {
+        final MachineServer server = new MachineServer(7777);
+        final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
         final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
                 //.withCoefficient(LongCoefficient.class)
-                .withProcessor(BeamProcessor.class)
+                .withProcessor(BeamProcessor.class, 
Map.of("beam.traverserServer.location", "localhost", 
"beam.traverserServer.port", 6666))
                 .withStrategy(IdentityStrategy.class);
 
         Traversal<Long, ?, ?> traversal = g.inject(List.of(1L, 
1L)).<Long>unfold().map(incr()).c(4L).repeat(incr()).until(__.is(__.constant(8L).incr().incr())).sum();
@@ -124,5 +128,7 @@ public class BeamTest {
         System.out.println(traversal.toList());
 
         g.close();
+        machine.close();
+        server.close();
     }
 }
diff --git 
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
 
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
index a9ef4b7..5445757 100644
--- 
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
+++ 
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
@@ -24,9 +24,11 @@ import org.apache.tinkerpop.language.gremlin.Traversal;
 import org.apache.tinkerpop.language.gremlin.TraversalSource;
 import org.apache.tinkerpop.language.gremlin.TraversalUtil;
 import org.apache.tinkerpop.language.gremlin.common.__;
-import org.apache.tinkerpop.machine.species.LocalMachine;
 import org.apache.tinkerpop.machine.Machine;
 import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.species.remote.MachineServer;
+import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
 import org.apache.tinkerpop.machine.structure.blueprints.BlueprintsStructure;
 import org.apache.tinkerpop.machine.structure.data.JMap;
@@ -61,7 +63,8 @@ public class PipesTest {
 
     @Test
     public void shouldWork() {
-        final Machine machine = LocalMachine.open();
+        final MachineServer server = new MachineServer(7777);
+        final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
         final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
                 .withCoefficient(LongCoefficient.class)
                 .withProcessor(PipesProcessor.class)
@@ -120,9 +123,14 @@ public class PipesTest {
         traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
+        System.out.println(traversal.hasNext());
         System.out.println(traversal.nextTraverser());
+        System.out.println(traversal.hasNext());
         System.out.println(traversal.nextTraverser());
         System.out.println(traversal.hasNext());
+        ///
+        g.close();
+        server.close();
 
     }
 

Reply via email to