[ https://issues.apache.org/jira/browse/TINKERPOP-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Felix Chapman updated TINKERPOP-1519: ------------------------------------- Description: When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration. e.g. if you send message {{A}} on {{out}} edges, and message {{B}} on {{in}} edges, then {{A}} and {{B}} will instead be sent over both {{in}} and {{out}} edges. The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations. An example of this behaviour is below: {code:java} public class TinkerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { TinkerGraph graph = TinkerGraph.open(); Vertex a = graph.addVertex("a"); Vertex b = graph.addVertex("b"); Vertex c = graph.addVertex("c"); a.addEdge("edge", b); b.addEdge("edge", c); // Simple graph: // a -> b -> c // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b" // then each vertex sums any received messages ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get(); // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3} System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next()); } } class MyVertexProgram implements VertexProgram<Long> { private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE); private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE); private static final String MEMORY_KEY = "count"; private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY); @Override public void setup(final Memory memory) {} @Override public GraphComputer.Persist getPreferredPersist() { return GraphComputer.Persist.VERTEX_PROPERTIES; } @Override public Set<String> getElementComputeKeys() { return COMPUTE_KEYS; } @Override public Set<MessageScope> getMessageScopes(final Memory memory) { return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut); } @Override public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) { switch (memory.getIteration()) { case 0: if (vertex.label().equals("b")) { messenger.sendMessage(this.countMessageScopeIn, 2L); messenger.sendMessage(this.countMessageScopeOut, 1L); } break; case 1: long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b); vertex.property(MEMORY_KEY, edgeCount); break; } } @Override public boolean terminate(final Memory memory) { return memory.getIteration() == 1; } @Override public GraphComputer.ResultGraph getPreferredResultGraph() { return GraphComputer.ResultGraph.NEW; } @Override public MyVertexProgram clone() { try { return (MyVertexProgram) super.clone(); } catch (final CloneNotSupportedException e) { throw new RuntimeException(e); } } } {code} was: When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration. The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations. An example of this behaviour is below: {code:java} public class TinkerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { TinkerGraph graph = TinkerGraph.open(); Vertex a = graph.addVertex("a"); Vertex b = graph.addVertex("b"); Vertex c = graph.addVertex("c"); a.addEdge("edge", b); b.addEdge("edge", c); // Simple graph: // a -> b -> c // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b" // then each vertex sums any received messages ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get(); // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3} System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next()); } } class MyVertexProgram implements VertexProgram<Long> { private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE); private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE); private static final String MEMORY_KEY = "count"; private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY); @Override public void setup(final Memory memory) {} @Override public GraphComputer.Persist getPreferredPersist() { return GraphComputer.Persist.VERTEX_PROPERTIES; } @Override public Set<String> getElementComputeKeys() { return COMPUTE_KEYS; } @Override public Set<MessageScope> getMessageScopes(final Memory memory) { return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut); } @Override public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) { switch (memory.getIteration()) { case 0: if (vertex.label().equals("b")) { messenger.sendMessage(this.countMessageScopeIn, 2L); messenger.sendMessage(this.countMessageScopeOut, 1L); } break; case 1: long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b); vertex.property(MEMORY_KEY, edgeCount); break; } } @Override public boolean terminate(final Memory memory) { return memory.getIteration() == 1; } @Override public GraphComputer.ResultGraph getPreferredResultGraph() { return GraphComputer.ResultGraph.NEW; } @Override public MyVertexProgram clone() { try { return (MyVertexProgram) super.clone(); } catch (final CloneNotSupportedException e) { throw new RuntimeException(e); } } } {code} > TinkerGraphComputer doesn't handle multiple MessageScopes in single iteration > ----------------------------------------------------------------------------- > > Key: TINKERPOP-1519 > URL: https://issues.apache.org/jira/browse/TINKERPOP-1519 > Project: TinkerPop > Issue Type: Bug > Components: tinkergraph > Affects Versions: 3.1.1-incubating > Environment: Mac OSX > Reporter: Felix Chapman > Priority: Minor > > When executing a VertexProgram that sends messages on multiple MessageScopes > in a single iteration, then the messages behave as if they were sent on all > scopes within that iteration. > e.g. if you send message {{A}} on {{out}} edges, and message {{B}} on {{in}} > edges, then {{A}} and {{B}} will instead be sent over both {{in}} and {{out}} > edges. > The problem can be resolved by using only a single MessageScope per > iteration, but this involves increasing the number of iterations. > An example of this behaviour is below: > {code:java} > public class TinkerTest { > public static void main(String[] args) throws ExecutionException, > InterruptedException { > TinkerGraph graph = TinkerGraph.open(); > Vertex a = graph.addVertex("a"); > Vertex b = graph.addVertex("b"); > Vertex c = graph.addVertex("c"); > a.addEdge("edge", b); > b.addEdge("edge", c); > // Simple graph: > // a -> b -> c > // Execute a traversal program that sends an incoming message of "2" > and an outgoing message of "1" from "b" > // then each vertex sums any received messages > ComputerResult result = graph.compute().program(new > MyVertexProgram()).submit().get(); > // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, > b=0, c=3} > > System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next()); > } > } > class MyVertexProgram implements VertexProgram<Long> { > private final MessageScope.Local<Long> countMessageScopeIn = > MessageScope.Local.of(__::inE); > private final MessageScope.Local<Long> countMessageScopeOut = > MessageScope.Local.of(__::outE); > private static final String MEMORY_KEY = "count"; > private static final Set<String> COMPUTE_KEYS = > Collections.singleton(MEMORY_KEY); > @Override > public void setup(final Memory memory) {} > @Override > public GraphComputer.Persist getPreferredPersist() { > return GraphComputer.Persist.VERTEX_PROPERTIES; > } > @Override > public Set<String> getElementComputeKeys() { > return COMPUTE_KEYS; > } > @Override > public Set<MessageScope> getMessageScopes(final Memory memory) { > return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut); > } > @Override > public void execute(Vertex vertex, Messenger<Long> messenger, Memory > memory) { > switch (memory.getIteration()) { > case 0: > if (vertex.label().equals("b")) { > messenger.sendMessage(this.countMessageScopeIn, 2L); > messenger.sendMessage(this.countMessageScopeOut, 1L); > } > break; > case 1: > long edgeCount = > IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b); > vertex.property(MEMORY_KEY, edgeCount); > break; > } > } > @Override > public boolean terminate(final Memory memory) { > return memory.getIteration() == 1; > } > @Override > public GraphComputer.ResultGraph getPreferredResultGraph() { > return GraphComputer.ResultGraph.NEW; > } > @Override > public MyVertexProgram clone() { > try { > return (MyVertexProgram) super.clone(); > } catch (final CloneNotSupportedException e) { > throw new RuntimeException(e); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)