Repository: tinkerpop Updated Branches: refs/heads/tp32 d42d54d6a -> 3df6c5806
Apply edgeFunction in SparkMessenger Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/744c4ecb Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/744c4ecb Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/744c4ecb Branch: refs/heads/tp32 Commit: 744c4ecbc042acdc481045d6df5077d52d370ac9 Parents: 92a09d8 Author: zhuchenchen <zhuchenc...@didichuxing.com> Authored: Mon Jan 22 11:15:41 2018 +0800 Committer: zhuchenchen <zhuchenc...@didichuxing.com> Committed: Mon Jan 22 11:15:41 2018 +0800 ---------------------------------------------------------------------- .../spark/process/computer/SparkMessenger.java | 2 +- .../process/computer/SparkMessengerTest.java | 86 ++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java index aab7ecd..53a755c 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java @@ -63,7 +63,7 @@ public final class SparkMessenger<M> implements Messenger<M> { final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope; final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex); final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal); - incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message))); + incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), localMessageScope.getEdgeFunction().apply(message, edge)))); } else { ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message))); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java new file mode 100644 index 0000000..c280ab2 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.process.computer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.tinkerpop.gremlin.process.computer.MessageScope; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; +import org.junit.Assert; +import org.junit.Test; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; +import java.util.function.BiFunction; + +/** + * @author Dean Zhu + */ +public class SparkMessengerTest extends AbstractSparkTest { + private static ObjectMapper objectmapper = new ObjectMapper(); + + @Test + public void testSparkMessenger() throws Exception { + // Define scopes + final MessageScope.Local<String> orderSrcMessageScope = MessageScope.Local + .of(__::inE, new BiFunction<String, Edge, String>() { + @Override + public String apply(String message, Edge edge) { + System.out.println(edge); + if ("mocked_edge_label1".equals(edge.label())) { + return message; + } + return null; + } + }); + final MessageScope.Local<String> inMessageScope = MessageScope.Local.of(__::inE); + + // Define star graph + final StarGraph starGraph = StarGraph.open(); + Object[] vertex0Array = new Object[]{T.id, 0, T.label, "mocked_vertex_label1"}; + Object[] vertex1Array = new Object[]{T.id, 1, T.label, "mocked_vertex_label2"}; + Object[] vertex2Array = new Object[]{T.id, 2, T.label, "mocked_vertex_label2"}; + Vertex vertex0 = starGraph.addVertex(vertex0Array); + Vertex vertex1 = starGraph.addVertex(vertex1Array); + Vertex vertex2 = starGraph.addVertex(vertex2Array); + vertex1.addEdge("mocked_edge_label1", vertex0); + vertex2.addEdge("mocked_edge_label2", vertex0); + + // Create Spark Messenger + final SparkMessenger<String> messenger = new SparkMessenger<>(); + final List<String> incomingMessages = Arrays.asList("a", "b", "c"); + messenger.setVertexAndIncomingMessages(vertex0, incomingMessages); + + messenger.sendMessage(orderSrcMessageScope, "a"); + List<Tuple2<Object, String>> outgoingMessages0 = messenger.getOutgoingMessages(); + System.out.println(objectmapper.writeValueAsString(outgoingMessages0)); + + Assert.assertEquals("a", outgoingMessages0.get(0)._2()); + Assert.assertNull(outgoingMessages0.get(1)._2()); + //messenger.sendMessage(inMessageScope, "a"); + //List<Tuple2<Object, String>> outgoingMessages1 = messenger.getOutgoingMessages(); + //System.out.println(objectmapper.writeValueAsString(outgoingMessages1)); + } +} \ No newline at end of file