Repository: tinkerpop
Updated Branches:
  refs/heads/tp32 d42d54d6a -> 3df6c5806


Apply edgeFunction in SparkMessenger


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/744c4ecb
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/744c4ecb
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/744c4ecb

Branch: refs/heads/tp32
Commit: 744c4ecbc042acdc481045d6df5077d52d370ac9
Parents: 92a09d8
Author: zhuchenchen <zhuchenc...@didichuxing.com>
Authored: Mon Jan 22 11:15:41 2018 +0800
Committer: zhuchenchen <zhuchenc...@didichuxing.com>
Committed: Mon Jan 22 11:15:41 2018 +0800

----------------------------------------------------------------------
 .../spark/process/computer/SparkMessenger.java  |  2 +-
 .../process/computer/SparkMessengerTest.java    | 86 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
index aab7ecd..53a755c 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -63,7 +63,7 @@ public final class SparkMessenger<M> implements Messenger<M> {
             final MessageScope.Local<M> localMessageScope = 
(MessageScope.Local) messageScope;
             final Traversal.Admin<Vertex, Edge> incidentTraversal = 
SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(),
 this.vertex);
             final Direction direction = 
SparkMessenger.getOppositeDirection(incidentTraversal);
-            incidentTraversal.forEachRemaining(edge -> 
this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), 
message)));
+            incidentTraversal.forEachRemaining(edge -> 
this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), 
localMessageScope.getEdgeFunction().apply(message, edge))));
         } else {
             ((MessageScope.Global) messageScope).vertices().forEach(v -> 
this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
new file mode 100644
index 0000000..c280ab2
--- /dev/null
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * @author Dean Zhu
+ */
+public class SparkMessengerTest extends AbstractSparkTest {
+    private static ObjectMapper objectmapper = new ObjectMapper();
+
+    @Test
+    public void testSparkMessenger() throws Exception {
+        // Define scopes
+        final MessageScope.Local<String> orderSrcMessageScope = 
MessageScope.Local
+                .of(__::inE, new BiFunction<String, Edge, String>() {
+                    @Override
+                    public String apply(String message, Edge edge) {
+                        System.out.println(edge);
+                        if ("mocked_edge_label1".equals(edge.label())) {
+                            return message;
+                        }
+                        return null;
+                    }
+                });
+        final MessageScope.Local<String> inMessageScope = 
MessageScope.Local.of(__::inE);
+
+        // Define star graph
+        final StarGraph starGraph = StarGraph.open();
+        Object[] vertex0Array = new Object[]{T.id, 0, T.label, 
"mocked_vertex_label1"};
+        Object[] vertex1Array = new Object[]{T.id, 1, T.label, 
"mocked_vertex_label2"};
+        Object[] vertex2Array = new Object[]{T.id, 2, T.label, 
"mocked_vertex_label2"};
+        Vertex vertex0 = starGraph.addVertex(vertex0Array);
+        Vertex vertex1 = starGraph.addVertex(vertex1Array);
+        Vertex vertex2 = starGraph.addVertex(vertex2Array);
+        vertex1.addEdge("mocked_edge_label1", vertex0);
+        vertex2.addEdge("mocked_edge_label2", vertex0);
+
+        // Create Spark Messenger
+        final SparkMessenger<String> messenger = new SparkMessenger<>();
+        final List<String> incomingMessages = Arrays.asList("a", "b", "c");
+        messenger.setVertexAndIncomingMessages(vertex0, incomingMessages);
+
+        messenger.sendMessage(orderSrcMessageScope, "a");
+        List<Tuple2<Object, String>> outgoingMessages0 = 
messenger.getOutgoingMessages();
+        System.out.println(objectmapper.writeValueAsString(outgoingMessages0));
+
+        Assert.assertEquals("a", outgoingMessages0.get(0)._2());
+        Assert.assertNull(outgoingMessages0.get(1)._2());
+        //messenger.sendMessage(inMessageScope, "a");
+        //List<Tuple2<Object, String>> outgoingMessages1 = 
messenger.getOutgoingMessages();
+        
//System.out.println(objectmapper.writeValueAsString(outgoingMessages1));
+    }
+}
\ No newline at end of file

Reply via email to