This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new da887c2 big moves. MachineServer, RemoteMachine, TraverserServer. We
now have general support for over the network traverser iterators.
da887c2 is described below
commit da887c23c1f18617ff77ede74d69d93a4895fb83
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 26 11:45:03 2019 -0600
big moves. MachineServer, RemoteMachine, TraverserServer. We now have
general support for over the network traverser iterators.
---
.../language/gremlin/TraversalSource.java | 8 ++
.../java/org/apache/tinkerpop/machine/Machine.java | 3 +-
.../tinkerpop/machine/bytecode/Bytecode.java | 3 +-
.../tinkerpop/machine/bytecode/BytecodeUtil.java | 5 +-
.../tinkerpop/machine/bytecode/Instruction.java | 3 +-
.../machine/bytecode/SourceInstruction.java | 3 +-
.../tinkerpop/machine/function/map/PathMap.java | 2 +-
.../tinkerpop/machine/species/BasicMachine.java | 6 ++
.../tinkerpop/machine/species/LocalMachine.java | 6 ++
.../tinkerpop/machine/species/RemoteMachine.java | 52 -----------
.../MachineServer.java} | 73 ++++++++-------
.../machine/species/remote/RemoteMachine.java | 100 +++++++++++++++++++++
.../tinkerpop/machine/species/remote/Request.java | 57 ++++++++++++
.../species/{io => remote}/TraverserServer.java | 17 ++--
.../tinkerpop/machine/structure/data/JTuple2.java | 4 +-
.../tinkerpop/machine/structure/data/TMap.java | 3 +-
.../tinkerpop/machine/processor/beam/Beam.java | 29 +++---
.../machine/processor/beam/BeamProcessor.java | 28 +++++-
.../tinkerpop/machine/processor/beam/OutputFn.java | 26 +++---
.../tinkerpop/machine/processor/beam/BeamTest.java | 14 ++-
.../machine/processor/pipes/PipesTest.java | 12 ++-
21 files changed, 310 insertions(+), 144 deletions(-)
diff --git
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
index fa2d29f..e27f680 100644
---
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
+++
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
@@ -33,6 +33,8 @@ import
org.apache.tinkerpop.machine.strategy.verification.CoefficientVerificatio
import org.apache.tinkerpop.machine.structure.StructureFactory;
import org.apache.tinkerpop.machine.structure.data.TVertex;
+import java.util.Map;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -64,6 +66,12 @@ public class TraversalSource<C> implements Cloneable {
return clone;
}
+ public TraversalSource<C> withProcessor(final Class<? extends
ProcessorFactory> processor, final Map<String, Object> configuration) {
+ final TraversalSource<C> clone = this.clone();
+ clone.bytecode.addSourceInstruction(Symbols.WITH_PROCESSOR, processor,
configuration);
+ return clone;
+ }
+
public TraversalSource<C> withStructure(final Class<? extends
StructureFactory> structure) {
final TraversalSource<C> clone = this.clone();
clone.bytecode.addSourceInstruction(Symbols.WITH_STRUCTURE, structure);
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
index a4f84e7..fb9074f 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/Machine.java
@@ -21,12 +21,13 @@ package org.apache.tinkerpop.machine;
import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.io.Closeable;
import java.util.Iterator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public interface Machine {
+public interface Machine extends Closeable {
public <C> Bytecode<C> register(final Bytecode<C> sourceCode);
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
index bcc1dab..ba77f99 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
@@ -20,13 +20,14 @@ package org.apache.tinkerpop.machine.bytecode;
import org.apache.tinkerpop.machine.coefficient.Coefficient;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class Bytecode<C> implements Cloneable { // todo: serializable?
+public final class Bytecode<C> implements Cloneable, Serializable {
private List<SourceInstruction> sourceInstructions = new ArrayList<>();
private List<Instruction<C>> instructions = new ArrayList<>();
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 07d4860..aa6bca6 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -34,6 +34,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -108,7 +109,9 @@ public final class BytecodeUtil {
ProcessorFactory processor = null;
for (final SourceInstruction sourceInstruction :
bytecode.getSourceInstructions()) {
if (sourceInstruction.op().equals(Symbols.WITH_PROCESSOR)) {
- processor = ((Class<? extends ProcessorFactory>)
sourceInstruction.args()[0]).getConstructor().newInstance();
+ processor = 1 == sourceInstruction.args().length ?
+ ((Class<? extends ProcessorFactory>)
sourceInstruction.args()[0]).getConstructor().newInstance() :
+ ((Class<? extends ProcessorFactory>)
sourceInstruction.args()[0]).getConstructor(Map.class).newInstance((Map)
sourceInstruction.args()[1]);
}
}
return Optional.ofNullable(processor);
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
index 6df4a20..48088b8 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
@@ -21,12 +21,13 @@ package org.apache.tinkerpop.machine.bytecode;
import org.apache.tinkerpop.machine.coefficient.Coefficient;
import org.apache.tinkerpop.machine.util.StringFactory;
+import java.io.Serializable;
import java.util.Arrays;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class Instruction<C> {
+public final class Instruction<C> implements Serializable {
private final Coefficient<C> coefficient;
private final String op;
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
index 6b484fc..2c54483 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
@@ -20,12 +20,13 @@ package org.apache.tinkerpop.machine.bytecode;
import org.apache.tinkerpop.machine.util.StringFactory;
+import java.io.Serializable;
import java.util.Arrays;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class SourceInstruction {
+public final class SourceInstruction implements Serializable {
private final String op;
private final Object[] args;
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
index ac06a21..75333a8 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
@@ -76,7 +76,7 @@ public final class PathMap<C, S> extends AbstractFunction<C>
implements MapFunct
final List<Compilation<C, Object, Object>> compilations = new
ArrayList<>();
boolean processingLabels = true;
for (final Object arg : instruction.args()) {
- if ("|" == arg) {
+ if (arg.equals("|")) {
processingLabels = false;
continue;
}
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
index 43b4946..2bf8b2c 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -52,4 +53,9 @@ public final class BasicMachine implements Machine {
public static Machine open() {
return new BasicMachine();
}
+
+ @Override
+ public void close() throws IOException {
+
+ }
}
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
index 042124c..9e192ee 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
@@ -26,6 +26,7 @@ import
org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.bytecode.compiler.SourceCompilation;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
@@ -91,4 +92,9 @@ public final class LocalMachine implements Machine {
}
return Optional.empty();
}
+
+ @Override
+ public void close() throws IOException {
+ this.sources.clear();
+ }
}
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java
deleted file mode 100644
index 00da1c6..0000000
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.machine.species;
-
-import org.apache.tinkerpop.machine.Machine;
-import org.apache.tinkerpop.machine.bytecode.Bytecode;
-import org.apache.tinkerpop.machine.traverser.Traverser;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class RemoteMachine implements Machine {
-
- private final int port;
-
- public RemoteMachine(final int port) {
- this.port = port;
- }
-
- @Override
- public <C> Bytecode<C> register(final Bytecode<C> sourceCode) {
- return null;
- }
-
- @Override
- public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode)
{
- return null;
- }
-
- @Override
- public <C> void close(final Bytecode<C> sourceCode) {
-
- }
-}
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
similarity index 57%
copy from
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
copy to
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
index a6b565b..8c9377a 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
@@ -16,14 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.tinkerpop.machine.species.io;
+package org.apache.tinkerpop.machine.species.remote;
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.species.LocalMachine;
import org.apache.tinkerpop.machine.traverser.Traverser;
-import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
+import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
@@ -32,65 +36,45 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class TraverserServer<C, S> implements Runnable,
Iterator<Traverser<C, S>> {
+public class MachineServer implements Runnable, Closeable {
- private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
private final int serverPort;
private ServerSocket serverSocket;
- private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
+ private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
+ private final Machine machine = LocalMachine.open();
- public TraverserServer(final int serverPort) {
+ public MachineServer(final int serverPort) {
this.serverPort = serverPort;
+ new Thread(this).start();
}
public void run() {
try {
this.serverSocket = new ServerSocket(this.serverPort);
- this.serverAlive.set(Boolean.TRUE);
- // System.out.println("Server started: " +
this.serverSocket.toString());
while (this.isAlive()) {
final Socket clientSocket = this.serverSocket.accept();
new Thread(new Worker(clientSocket)).start();
}
- // System.out.println("Server Stopped.");
} catch (final Exception e) {
if (this.serverAlive.get())
throw new RuntimeException(e.getMessage(), e);
}
}
-
private boolean isAlive() {
return this.serverAlive.get();
}
- public synchronized void stop() {
+ public synchronized void close() {
try {
this.serverAlive.set(Boolean.FALSE);
this.serverSocket.close();
+ this.machine.close();
} catch (final IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
- @Override
- public boolean hasNext() {
- if (!this.traverserSet.isEmpty())
- return true;
- else {
- while (this.isAlive()) {
- if (!this.traverserSet.isEmpty())
- return true;
- }
- return !this.traverserSet.isEmpty();
- }
- }
-
- @Override
- public Traverser<C, S> next() {
- return this.traverserSet.remove();
- }
-
public class Worker implements Runnable {
private final Socket clientSocket;
@@ -100,16 +84,31 @@ public final class TraverserServer<C, S> implements
Runnable, Iterator<Traverser
}
public void run() {
- //int counter = 0;
try {
- //System.out.println("Client connected: " +
this.clientSocket.toString());
final ObjectInputStream input = new
ObjectInputStream(this.clientSocket.getInputStream());
+ final ObjectOutputStream output = new
ObjectOutputStream(this.clientSocket.getOutputStream());
while (true) {
- final Traverser<C, S> traverser = (Traverser<C, S>)
input.readObject();
- //System.out.println("Received traverser [" +
this.clientSocket.getPort() + "]: " + traverser);
- traverserSet.add(traverser);
+ final Request<Object> request = (Request)
input.readObject();
+ if (Request.Type.register == request.type) {
+ output.writeObject(machine.register(request.bytecode));
+ output.flush();
+ } else if (Request.Type.submit == request.type) {
+ final Socket traverserServerSocket = new
Socket(request.traverserServerLocation, request.traverserServerPort);
+ final ObjectOutputStream traverserOutput = new
ObjectOutputStream(traverserServerSocket.getOutputStream());
+ final Iterator<Traverser<Object, Object>> iterator =
machine.submit(request.bytecode);
+ while (iterator.hasNext()) {
+ traverserOutput.writeObject(iterator.next());
+ traverserOutput.flush();
+ }
+ traverserOutput.writeObject(EmptyTraverser.instance());
+ traverserOutput.flush();
+ traverserOutput.close();
+ } else if (Request.Type.close == request.type) {
+ machine.close(request.bytecode);
+ } else {
+ throw new IllegalStateException("This shouldn't
happen: " + request);
+ }
}
- //System.out.println(this.toString() + ": is complete..." +
counter);
} catch (final EOFException e) {
// okay -- this is how the worker closes
} catch (final IOException | ClassNotFoundException e) {
@@ -117,6 +116,4 @@ public final class TraverserServer<C, S> implements
Runnable, Iterator<Traverser
}
}
}
-
-
-}
+}
\ No newline at end of file
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
new file mode 100644
index 0000000..8874b2b
--- /dev/null
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.species.remote;
+
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RemoteMachine implements Machine {
+
+ private final String localTraverserServerLocation;
+ private final int localTraverserServerPort;
+ private final Socket machineServer;
+ private final ObjectInputStream inputStream;
+ private final ObjectOutputStream outputStream;
+
+
+ private RemoteMachine(final int localTraverserServerPort, final String
machineLocation, final int machinePort) {
+ try {
+ this.localTraverserServerLocation =
InetAddress.getLocalHost().getHostAddress();
+ this.localTraverserServerPort = localTraverserServerPort;
+ this.machineServer = new Socket(machineLocation, machinePort);
+ this.outputStream = new
ObjectOutputStream(machineServer.getOutputStream());
+ this.inputStream = new
ObjectInputStream(machineServer.getInputStream());
+ } catch (final Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public <C> Bytecode<C> register(final Bytecode<C> sourceCode) {
+ try {
+ this.outputStream.writeObject(Request.register(sourceCode));
+ this.outputStream.flush();
+ return (Bytecode<C>) this.inputStream.readObject();
+ } catch (final IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode)
{
+ try {
+ final TraverserServer<C, E> traverserServer = new
TraverserServer<>(this.localTraverserServerPort);
+ new Thread(traverserServer).start();
+ this.outputStream.writeObject(Request.submit(bytecode,
this.localTraverserServerLocation, this.localTraverserServerPort));
+ this.outputStream.flush();
+ return traverserServer;
+ } catch (final IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public <C> void close(final Bytecode<C> sourceCode) {
+ try {
+ this.outputStream.writeObject(Request.close(sourceCode));
+ this.outputStream.flush();
+ } catch (final IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static Machine open(final int localTraverserServerPort, final
String machineLocation, final int machinePort) {
+ return new RemoteMachine(localTraverserServerPort, machineLocation,
machinePort);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.inputStream.close();
+ this.outputStream.close();
+ this.machineServer.close();
+ }
+}
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/Request.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/Request.java
new file mode 100644
index 0000000..27c1edc
--- /dev/null
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/Request.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.species.remote;
+
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class Request<C> implements Serializable {
+
+ public enum Type {
+ register, submit, close;
+ }
+
+ public final Type type;
+ public final Bytecode<C> bytecode;
+ public final String traverserServerLocation;
+ public final int traverserServerPort;
+
+ private Request(final Type type, final Bytecode<C> bytecode, final String
traverserServerLocation, final int traverserServerPort) {
+ this.type = type;
+ this.bytecode = bytecode;
+ this.traverserServerLocation = traverserServerLocation;
+ this.traverserServerPort = traverserServerPort;
+ }
+
+ static <C> Request<C> register(final Bytecode<C> bytecode) {
+ return new Request<>(Type.register, bytecode, null, -1);
+ }
+
+ static <C> Request<C> submit(final Bytecode<C> bytecode, final String
traverserServerLocation, final int traverserServerPort) {
+ return new Request<>(Type.submit, bytecode, traverserServerLocation,
traverserServerPort);
+ }
+
+ static <C> Request<C> close(final Bytecode<C> bytecode) {
+ return new Request<>(Type.close, bytecode, null, -1);
+ }
+}
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
similarity index 86%
rename from
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
rename to
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
index a6b565b..4bb1683 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.tinkerpop.machine.species.io;
+package org.apache.tinkerpop.machine.species.remote;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
import java.io.EOFException;
import java.io.IOException;
@@ -37,7 +38,7 @@ public final class TraverserServer<C, S> implements Runnable,
Iterator<Traverser
private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
private final int serverPort;
private ServerSocket serverSocket;
- private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
+ private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
public TraverserServer(final int serverPort) {
this.serverPort = serverPort;
@@ -46,20 +47,16 @@ public final class TraverserServer<C, S> implements
Runnable, Iterator<Traverser
public void run() {
try {
this.serverSocket = new ServerSocket(this.serverPort);
- this.serverAlive.set(Boolean.TRUE);
- // System.out.println("Server started: " +
this.serverSocket.toString());
while (this.isAlive()) {
final Socket clientSocket = this.serverSocket.accept();
new Thread(new Worker(clientSocket)).start();
}
- // System.out.println("Server Stopped.");
} catch (final Exception e) {
if (this.serverAlive.get())
throw new RuntimeException(e.getMessage(), e);
}
}
-
private boolean isAlive() {
return this.serverAlive.get();
}
@@ -100,16 +97,16 @@ public final class TraverserServer<C, S> implements
Runnable, Iterator<Traverser
}
public void run() {
- //int counter = 0;
try {
- //System.out.println("Client connected: " +
this.clientSocket.toString());
final ObjectInputStream input = new
ObjectInputStream(this.clientSocket.getInputStream());
while (true) {
final Traverser<C, S> traverser = (Traverser<C, S>)
input.readObject();
- //System.out.println("Received traverser [" +
this.clientSocket.getPort() + "]: " + traverser);
+ if (traverser instanceof EmptyTraverser) {
+ stop();
+ break;
+ }
traverserSet.add(traverser);
}
- //System.out.println(this.toString() + ": is complete..." +
counter);
} catch (final EOFException e) {
// okay -- this is how the worker closes
} catch (final IOException | ClassNotFoundException e) {
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
index cf357b4..e8efb9c 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/JTuple2.java
@@ -18,10 +18,12 @@
*/
package org.apache.tinkerpop.machine.structure.data;
+import java.io.Serializable;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class JTuple2<A, B> implements TTuple2<A, B> {
+public final class JTuple2<A, B> implements TTuple2<A, B>, Serializable {
private final A a;
private final B b;
diff --git
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
index 70cbb52..0a4471b 100644
---
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
+++
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/structure/data/TMap.java
@@ -18,12 +18,13 @@
*/
package org.apache.tinkerpop.machine.structure.data;
+import java.io.Serializable;
import java.util.Iterator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public interface TMap<K, V> {
+public interface TMap<K, V> extends Serializable {
public void set(final K key, final V value);
diff --git
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index b984670..6db2e6f 100644
---
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -28,10 +28,11 @@ import org.apache.tinkerpop.machine.processor.Processor;
import
org.apache.tinkerpop.machine.processor.beam.serialization.TraverserCoder;
import org.apache.tinkerpop.machine.processor.beam.util.ExecutionPlanner;
import org.apache.tinkerpop.machine.processor.beam.util.TopologyUtil;
-import org.apache.tinkerpop.machine.species.io.TraverserServer;
+import org.apache.tinkerpop.machine.species.remote.TraverserServer;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
+import java.util.Collections;
import java.util.Iterator;
/**
@@ -39,20 +40,22 @@ import java.util.Iterator;
*/
public class Beam<C, S, E> implements Processor<C, S, E> {
- public static final int RESULT_SERVER_PORT = 6532; // TODO: this needs to
be a dynamic configuration
public static final int MAX_REPETIONS = 15; // TODO: this needs to be a
dynamic configuration
-
+ private boolean createTraverserServer;
+ private final int traverserServerPort;
private final Pipeline pipeline;
private Iterator<Traverser<C, E>> iterator = null;
- public Beam(final Compilation<C, S, E> compilation) {
+ public Beam(final Compilation<C, S, E> compilation, final String
traverserServerLocation, final int traverserServerPort, final boolean
createTraverserServer) {
+ this.traverserServerPort = traverserServerPort;
+ this.createTraverserServer = createTraverserServer;
+ ///
this.pipeline = Pipeline.create();
this.pipeline.getOptions().setRunner(new
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
final PCollection<Traverser<C, S>> source =
this.pipeline.apply(Create.of(EmptyTraverser.instance()));
source.setCoder(new TraverserCoder<>());
final PCollection<Traverser<C, E>> sink = TopologyUtil.compile(source,
compilation);
- sink.apply(ParDo.of(new OutputFn<>("localhost", RESULT_SERVER_PORT)));
-
+ sink.apply(ParDo.of(new OutputFn<>(traverserServerLocation,
this.traverserServerPort)));
}
@Override
@@ -84,14 +87,16 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
return visitor.toString();
}
- private final void setupPipeline() {
+ private void setupPipeline() {
if (null == this.iterator) {
- final TraverserServer<C, E> server = new
TraverserServer<>(Beam.RESULT_SERVER_PORT);
- new Thread(server).start();
+ if (this.createTraverserServer) {
+ this.iterator = new
TraverserServer<>(this.traverserServerPort);
+ new Thread((TraverserServer) this.iterator).start();
+ } else
+ this.iterator = Collections.emptyIterator();
this.pipeline.run().waitUntilFinish();
- server.stop();
- this.iterator = server;
+ if (this.iterator instanceof TraverserServer)
+ ((TraverserServer<C, E>) this.iterator).stop();
}
}
-
}
diff --git
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
index 87d3481..53ed990 100644
---
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
+++
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BeamProcessor.java
@@ -19,17 +19,19 @@
package org.apache.tinkerpop.machine.processor.beam;
import org.apache.tinkerpop.machine.bytecode.compiler.BytecodeCompiler;
-import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.bytecode.compiler.CommonCompiler;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.processor.ProcessorFactory;
import org.apache.tinkerpop.machine.processor.beam.strategy.BeamStrategy;
import org.apache.tinkerpop.machine.strategy.Strategy;
-import java.util.Arrays;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -38,10 +40,30 @@ import java.util.Set;
public class BeamProcessor implements ProcessorFactory {
private static final List<BytecodeCompiler> COMPILERS =
List.of(CoreCompiler.instance(), CommonCompiler.instance());
+ private static final int DEFAULT_SERVER_PORT = 8888;
+ public static final String TRAVERSER_SERVER_LOCATION =
"beam.traverserServer.location";
+ public static final String TRAVERSER_SERVER_PORT =
"beam.traverserServer.port";
+
+ private final Map<String, Object> configuration;
+
+ public BeamProcessor() {
+ this.configuration = Collections.emptyMap();
+ }
+
+ public BeamProcessor(final Map<String, Object> configuration) {
+ this.configuration = configuration;
+ }
@Override
public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E>
compilation) {
- return new Beam<>(compilation);
+ try {
+ return new Beam<>(compilation,
+ (String)
this.configuration.getOrDefault(TRAVERSER_SERVER_LOCATION,
InetAddress.getLocalHost().getHostAddress()),
+ (Integer)
this.configuration.getOrDefault(TRAVERSER_SERVER_PORT, DEFAULT_SERVER_PORT),
+
!this.configuration.containsKey(TRAVERSER_SERVER_LOCATION));
+ } catch (final UnknownHostException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
@Override
diff --git
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
index c270456..ee31a1a 100644
---
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
+++
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java
@@ -30,14 +30,14 @@ import java.net.Socket;
*/
public class OutputFn<C, S> extends DoFn<Traverser<C, S>, Void> {
- private final String resultServerLocation;
- private final int resultServerPort;
- private Socket resultServer;
+ private final String traverserServerLocation;
+ private final int traverserServerPort;
+ private Socket traverserServerSocket;
private ObjectOutputStream outputStream;
- OutputFn(final String resultServerLocation, final int resultServerPort) {
- this.resultServerLocation = resultServerLocation;
- this.resultServerPort = resultServerPort;
+ OutputFn(final String traverserServerLocation, final int
traverserServerPort) {
+ this.traverserServerLocation = traverserServerLocation;
+ this.traverserServerPort = traverserServerPort;
}
@ProcessElement
@@ -52,16 +52,13 @@ public class OutputFn<C, S> extends DoFn<Traverser<C, S>,
Void> {
@StartBundle
public void startBundle() {
// only create a connection if results are generated
- if (null == this.resultServer) {
+ if (null == this.traverserServerSocket) {
try {
- //System.out.println("setting up client: " + this.toString());
- this.resultServer = new Socket(this.resultServerLocation,
this.resultServerPort);
- this.outputStream = new
ObjectOutputStream(this.resultServer.getOutputStream());
- //System.out.println("Connected to server: " +
this.resultServer.toString());
+ this.traverserServerSocket = new
Socket(this.traverserServerLocation, this.traverserServerPort);
+ this.outputStream = new
ObjectOutputStream(this.traverserServerSocket.getOutputStream());
} catch (final Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
- //System.out.println(this.toString() + " client setup");
}
}
@@ -76,11 +73,10 @@ public class OutputFn<C, S> extends DoFn<Traverser<C, S>,
Void> {
@Teardown
public void stop() {
- if (null != this.resultServer) {
+ if (null != this.traverserServerSocket) {
try {
- // System.out.println(this.toString() + " client stopping");
this.outputStream.flush();
- this.resultServer.close();
+ this.traverserServerSocket.close();
} catch (final Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
diff --git
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
index ac28f62..5e00a82 100644
---
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
+++
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
@@ -24,14 +24,17 @@ import org.apache.tinkerpop.language.gremlin.Traversal;
import org.apache.tinkerpop.language.gremlin.TraversalSource;
import org.apache.tinkerpop.language.gremlin.TraversalUtil;
import org.apache.tinkerpop.language.gremlin.core.__;
-import org.apache.tinkerpop.machine.species.LocalMachine;
import org.apache.tinkerpop.machine.Machine;
import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.species.remote.MachineServer;
+import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
import org.apache.tinkerpop.machine.structure.blueprints.BlueprintsStructure;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Map;
import static org.apache.tinkerpop.language.gremlin.core.__.constant;
import static org.apache.tinkerpop.language.gremlin.core.__.incr;
@@ -66,11 +69,12 @@ public class BeamTest {
}
@Test
- public void shouldWork() {
- final Machine machine = LocalMachine.open();
+ public void shouldWork() throws Exception {
+ final MachineServer server = new MachineServer(7777);
+ final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
//.withCoefficient(LongCoefficient.class)
- .withProcessor(BeamProcessor.class)
+ .withProcessor(BeamProcessor.class,
Map.of("beam.traverserServer.location", "localhost",
"beam.traverserServer.port", 6666))
.withStrategy(IdentityStrategy.class);
Traversal<Long, ?, ?> traversal = g.inject(List.of(1L,
1L)).<Long>unfold().map(incr()).c(4L).repeat(incr()).until(__.is(__.constant(8L).incr().incr())).sum();
@@ -124,5 +128,7 @@ public class BeamTest {
System.out.println(traversal.toList());
g.close();
+ machine.close();
+ server.close();
}
}
diff --git
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
index a9ef4b7..5445757 100644
---
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
+++
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
@@ -24,9 +24,11 @@ import org.apache.tinkerpop.language.gremlin.Traversal;
import org.apache.tinkerpop.language.gremlin.TraversalSource;
import org.apache.tinkerpop.language.gremlin.TraversalUtil;
import org.apache.tinkerpop.language.gremlin.common.__;
-import org.apache.tinkerpop.machine.species.LocalMachine;
import org.apache.tinkerpop.machine.Machine;
import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.species.remote.MachineServer;
+import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
import org.apache.tinkerpop.machine.structure.blueprints.BlueprintsStructure;
import org.apache.tinkerpop.machine.structure.data.JMap;
@@ -61,7 +63,8 @@ public class PipesTest {
@Test
public void shouldWork() {
- final Machine machine = LocalMachine.open();
+ final MachineServer server = new MachineServer(7777);
+ final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
.withCoefficient(LongCoefficient.class)
.withProcessor(PipesProcessor.class)
@@ -120,9 +123,14 @@ public class PipesTest {
traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);
+ System.out.println(traversal.hasNext());
System.out.println(traversal.nextTraverser());
+ System.out.println(traversal.hasNext());
System.out.println(traversal.nextTraverser());
System.out.println(traversal.hasNext());
+ ///
+ g.close();
+ server.close();
}