[
https://issues.apache.org/jira/browse/TINKERPOP-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Felix Chapman updated TINKERPOP-1519:
-------------------------------------
Description:
When executing a VertexProgram that sends messages on multiple MessageScopes in
a single iteration, then the messages behave as if they were sent on all scopes
within that iteration.
e.g. if you send message {{A}} on {{out}} edges, and message {{B}} on {{in}}
edges, then {{A}} and {{B}} will instead be sent over both {{in}} and {{out}}
edges.
The problem can be resolved by using only a single MessageScope per iteration,
but this involves increasing the number of iterations.
An example of this behaviour is below:
{code:java}
public class TinkerTest {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
TinkerGraph graph = TinkerGraph.open();
Vertex a = graph.addVertex("a");
Vertex b = graph.addVertex("b");
Vertex c = graph.addVertex("c");
a.addEdge("edge", b);
b.addEdge("edge", c);
// Simple graph:
// a -> b -> c
// Execute a traversal program that sends an incoming message of "2"
and an outgoing message of "1" from "b"
// then each vertex sums any received messages
ComputerResult result = graph.compute().program(new
MyVertexProgram()).submit().get();
// We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3,
b=0, c=3}
System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
}
}
class MyVertexProgram implements VertexProgram<Long> {
private final MessageScope.Local<Long> countMessageScopeIn =
MessageScope.Local.of(__::inE);
private final MessageScope.Local<Long> countMessageScopeOut =
MessageScope.Local.of(__::outE);
private static final String MEMORY_KEY = "count";
private static final Set<String> COMPUTE_KEYS =
Collections.singleton(MEMORY_KEY);
@Override
public void setup(final Memory memory) {}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<String> getElementComputeKeys() {
return COMPUTE_KEYS;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
}
@Override
public void execute(Vertex vertex, Messenger<Long> messenger, Memory
memory) {
switch (memory.getIteration()) {
case 0:
if (vertex.label().equals("b")) {
messenger.sendMessage(this.countMessageScopeIn, 2L);
messenger.sendMessage(this.countMessageScopeOut, 1L);
}
break;
case 1:
long edgeCount =
IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
vertex.property(MEMORY_KEY, edgeCount);
break;
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public MyVertexProgram clone() {
try {
return (MyVertexProgram) super.clone();
} catch (final CloneNotSupportedException e) {
throw new RuntimeException(e);
}
}
}
{code}
was:
When executing a VertexProgram that sends messages on multiple MessageScopes in
a single iteration, then the messages behave as if they were sent on all scopes
within that iteration.
The problem can be resolved by using only a single MessageScope per iteration,
but this involves increasing the number of iterations.
An example of this behaviour is below:
{code:java}
public class TinkerTest {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
TinkerGraph graph = TinkerGraph.open();
Vertex a = graph.addVertex("a");
Vertex b = graph.addVertex("b");
Vertex c = graph.addVertex("c");
a.addEdge("edge", b);
b.addEdge("edge", c);
// Simple graph:
// a -> b -> c
// Execute a traversal program that sends an incoming message of "2"
and an outgoing message of "1" from "b"
// then each vertex sums any received messages
ComputerResult result = graph.compute().program(new
MyVertexProgram()).submit().get();
// We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3,
b=0, c=3}
System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
}
}
class MyVertexProgram implements VertexProgram<Long> {
private final MessageScope.Local<Long> countMessageScopeIn =
MessageScope.Local.of(__::inE);
private final MessageScope.Local<Long> countMessageScopeOut =
MessageScope.Local.of(__::outE);
private static final String MEMORY_KEY = "count";
private static final Set<String> COMPUTE_KEYS =
Collections.singleton(MEMORY_KEY);
@Override
public void setup(final Memory memory) {}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<String> getElementComputeKeys() {
return COMPUTE_KEYS;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
}
@Override
public void execute(Vertex vertex, Messenger<Long> messenger, Memory
memory) {
switch (memory.getIteration()) {
case 0:
if (vertex.label().equals("b")) {
messenger.sendMessage(this.countMessageScopeIn, 2L);
messenger.sendMessage(this.countMessageScopeOut, 1L);
}
break;
case 1:
long edgeCount =
IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
vertex.property(MEMORY_KEY, edgeCount);
break;
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public MyVertexProgram clone() {
try {
return (MyVertexProgram) super.clone();
} catch (final CloneNotSupportedException e) {
throw new RuntimeException(e);
}
}
}
{code}
> TinkerGraphComputer doesn't handle multiple MessageScopes in single iteration
> -----------------------------------------------------------------------------
>
> Key: TINKERPOP-1519
> URL: https://issues.apache.org/jira/browse/TINKERPOP-1519
> Project: TinkerPop
> Issue Type: Bug
> Components: tinkergraph
> Affects Versions: 3.1.1-incubating
> Environment: Mac OSX
> Reporter: Felix Chapman
> Priority: Minor
>
> When executing a VertexProgram that sends messages on multiple MessageScopes
> in a single iteration, then the messages behave as if they were sent on all
> scopes within that iteration.
> e.g. if you send message {{A}} on {{out}} edges, and message {{B}} on {{in}}
> edges, then {{A}} and {{B}} will instead be sent over both {{in}} and {{out}}
> edges.
> The problem can be resolved by using only a single MessageScope per
> iteration, but this involves increasing the number of iterations.
> An example of this behaviour is below:
> {code:java}
> public class TinkerTest {
> public static void main(String[] args) throws ExecutionException,
> InterruptedException {
> TinkerGraph graph = TinkerGraph.open();
> Vertex a = graph.addVertex("a");
> Vertex b = graph.addVertex("b");
> Vertex c = graph.addVertex("c");
> a.addEdge("edge", b);
> b.addEdge("edge", c);
> // Simple graph:
> // a -> b -> c
> // Execute a traversal program that sends an incoming message of "2"
> and an outgoing message of "1" from "b"
> // then each vertex sums any received messages
> ComputerResult result = graph.compute().program(new
> MyVertexProgram()).submit().get();
> // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3,
> b=0, c=3}
>
> System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
> }
> }
> class MyVertexProgram implements VertexProgram<Long> {
> private final MessageScope.Local<Long> countMessageScopeIn =
> MessageScope.Local.of(__::inE);
> private final MessageScope.Local<Long> countMessageScopeOut =
> MessageScope.Local.of(__::outE);
> private static final String MEMORY_KEY = "count";
> private static final Set<String> COMPUTE_KEYS =
> Collections.singleton(MEMORY_KEY);
> @Override
> public void setup(final Memory memory) {}
> @Override
> public GraphComputer.Persist getPreferredPersist() {
> return GraphComputer.Persist.VERTEX_PROPERTIES;
> }
> @Override
> public Set<String> getElementComputeKeys() {
> return COMPUTE_KEYS;
> }
> @Override
> public Set<MessageScope> getMessageScopes(final Memory memory) {
> return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
> }
> @Override
> public void execute(Vertex vertex, Messenger<Long> messenger, Memory
> memory) {
> switch (memory.getIteration()) {
> case 0:
> if (vertex.label().equals("b")) {
> messenger.sendMessage(this.countMessageScopeIn, 2L);
> messenger.sendMessage(this.countMessageScopeOut, 1L);
> }
> break;
> case 1:
> long edgeCount =
> IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
> vertex.property(MEMORY_KEY, edgeCount);
> break;
> }
> }
> @Override
> public boolean terminate(final Memory memory) {
> return memory.getIteration() == 1;
> }
> @Override
> public GraphComputer.ResultGraph getPreferredResultGraph() {
> return GraphComputer.ResultGraph.NEW;
> }
> @Override
> public MyVertexProgram clone() {
> try {
> return (MyVertexProgram) super.clone();
> } catch (final CloneNotSupportedException e) {
> throw new RuntimeException(e);
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)