Adding simple graph computer test of proper message passing in all 3 directions


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/80c0b846
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/80c0b846
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/80c0b846

Branch: refs/heads/TINKERPOP-1862
Commit: 80c0b8465fe9bb6ddbe4522a36304c7ba054e909
Parents: f02ea33
Author: Graff, Philip B <philip.gr...@jhuapl.edu>
Authored: Sat Jan 13 21:32:29 2018 -0500
Committer: Graff, Philip B <philip.gr...@jhuapl.edu>
Committed: Sun Feb 25 10:44:15 2018 -0500

----------------------------------------------------------------------
 .../process/computer/GraphComputerTest.java     | 175 +++++++++++++++++++
 1 file changed, 175 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/80c0b846/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 8c846d5..c34c2dc 100644
--- 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath;
 import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
@@ -2683,4 +2684,178 @@ public class GraphComputerTest extends 
AbstractGremlinProcessTest {
             };
         }
     }
+
+    ///////////////////////////////////
+
+    @Test
+    public void testMessagePassingIn() throws Exception {
+        runTest(Direction.IN).forEachRemaining(v -> {
+            String in = v.value("propin").toString();
+            if (in.equals("a")) {
+                assertEquals("ab", v.value("propout").toString());
+            } else {
+                assertEquals("", v.value("propout").toString());
+            }
+        });
+    }
+
+    @Test
+    public void testMessagePassingOut() throws Exception {
+        runTest(Direction.OUT).forEachRemaining(v -> {
+            String in = v.value("propin").toString();
+            if (in.equals("a")) {
+                assertEquals("a", v.value("propout").toString());
+            } else {
+                assertEquals("a", v.value("propout").toString());
+            }
+        });
+    }
+
+    @Test
+    public void testMessagePassingBoth() throws Exception {
+        runTest(Direction.BOTH).forEachRemaining(v -> {
+            String in = v.value("propin").toString();
+            if (in.equals("a")) {
+                assertEquals("aab", v.value("propout").toString());
+            } else {
+                assertEquals("a", v.value("propout").toString());
+            }
+        });
+    }
+
+    private GraphTraversal<Vertex, Vertex> runTest(Direction direction) throws 
Exception {
+        g.addV().property("propin", "a").as("a")
+                .addV().property("propin", "b").as("b")
+                
.addE("edge").from("a").to("b").addE("edge").from("a").to("a").iterate();
+        final VertexProgramR svp = VertexProgramR.build().propertyIn("propin")
+                .propertyOut("propout").direction(direction).create();
+        final ComputerResult result = 
graphProvider.getGraphComputer(graph).program(svp).submit().get();
+        return result.graph().traversal().V();
+    }
+
+    private static class VertexProgramR implements VertexProgram<String> {
+        private static final String SIMPLE_VERTEX_PROGRAM_CFG_PREFIX = 
"gremlin.simpleVertexProgram";
+        private static final String PROPERTY_OUT_CFG_KEY = 
SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".propertyout";
+        private static final String PROPERTY_IN_CFG_KEY = 
SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".propertyin";
+        private static final String DIRECTION_CFG_KEY = 
SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".direction";
+
+        private final MessageScope.Local<String> inMessageScope = 
MessageScope.Local.of(__::inE);
+        private final MessageScope.Local<String> outMessageScope = 
MessageScope.Local.of(__::outE);
+        private final MessageScope.Local<String> bothMessageScope = 
MessageScope.Local.of(__::bothE);
+        private MessageScope.Local<String> messageScope;
+        private Set<VertexComputeKey> vertexComputeKeys;
+
+        private String propertyout, propertyin;
+
+        /**
+         * Clones this vertex program.
+         *
+         * @return a clone of this vertex program
+         */
+        public VertexProgramR clone() { return this; }
+
+        @Override
+        public void loadState(final Graph graph, final Configuration 
configuration) {
+            this.propertyout = configuration.getString(PROPERTY_OUT_CFG_KEY);
+            this.propertyin = configuration.getString(PROPERTY_IN_CFG_KEY);
+            Direction direction = 
Direction.valueOf(configuration.getString(DIRECTION_CFG_KEY));
+            switch (direction) {
+                case IN:
+                    this.messageScope = this.inMessageScope;
+                    break;
+                case OUT:
+                    this.messageScope = this.outMessageScope;
+                    break;
+                default:
+                    this.messageScope = this.bothMessageScope;
+                    break;
+            }
+            this.vertexComputeKeys = new 
HashSet<>(Arrays.asList(VertexComputeKey.of(this.propertyout, false),
+                    VertexComputeKey.of(this.propertyin, false)));
+        }
+
+        @Override
+        public void setup(Memory memory) {
+        }
+
+        @Override
+        public void execute(Vertex vertex, Messenger<String> messenger, Memory 
memory) {
+            if (memory.isInitialIteration()) {
+                messenger.sendMessage(this.messageScope, 
vertex.value(this.propertyin).toString());
+            } else {
+                char[] composite = 
IteratorUtils.reduce(messenger.receiveMessages(), "", (a, b) -> a + 
b).toCharArray();
+                Arrays.sort(composite);
+                vertex.property(this.propertyout, new String(composite));
+            }
+        }
+
+        @Override
+        public boolean terminate(Memory memory) {
+            return !memory.isInitialIteration();
+        }
+
+        @Override
+        public Set<MessageScope> getMessageScopes(Memory memory) {
+            return Collections.singleton(this.messageScope);
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.VERTEX_PROPERTIES;
+        }
+
+        @Override
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return this.vertexComputeKeys;
+        }
+
+        @Override
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return Collections.emptySet();
+        }
+
+        public static Builder build() {
+            return new Builder();
+        }
+
+        static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+            private Builder() {
+                super(VertexProgramR.class);
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public VertexProgramR create(final Graph graph) {
+                if (graph != null) {
+                    
ConfigurationUtils.append(graph.configuration().subset(SIMPLE_VERTEX_PROGRAM_CFG_PREFIX),
 configuration);
+                }
+                return (VertexProgramR) 
VertexProgram.createVertexProgram(graph, configuration);
+            }
+
+            public VertexProgramR create() {
+                return create(null);
+            }
+
+            public Builder propertyOut(final String name) {
+                configuration.setProperty(PROPERTY_OUT_CFG_KEY, name);
+                return this;
+            }
+
+            public Builder propertyIn(final String name) {
+                configuration.setProperty(PROPERTY_IN_CFG_KEY, name);
+                return this;
+            }
+
+            public Builder direction(final Direction direction) {
+                configuration.setProperty(DIRECTION_CFG_KEY, 
direction.toString());
+                return this;
+            }
+        }
+    }
 }

Reply via email to