[ 
https://issues.apache.org/jira/browse/TINKERPOP-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Chapman updated TINKERPOP-1519:
-------------------------------------
    Description: 
When executing a VertexProgram that sends messages on multiple MessageScopes in 
a single iteration, then the messages behave as if they were sent on all scopes 
within that iteration.

e.g. if you send message {{A}} on {{out}} edges, and message {{B}} on {{in}} 
edges, then {{A}} and {{B}} will instead be sent over both {{in}} and {{out}} 
edges.

The problem can be resolved by using only a single MessageScope per iteration, 
but this involves increasing the number of iterations.

An example of this behaviour is below:

{code:java}
public class TinkerTest {

    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
        TinkerGraph graph = TinkerGraph.open();
        Vertex a = graph.addVertex("a");
        Vertex b = graph.addVertex("b");
        Vertex c = graph.addVertex("c");
        a.addEdge("edge", b);
        b.addEdge("edge", c);

        // Simple graph:
        // a -> b -> c

        // Execute a traversal program that sends an incoming message of "2" 
and an outgoing message of "1" from "b"
        // then each vertex sums any received messages
        ComputerResult result = graph.compute().program(new 
MyVertexProgram()).submit().get();

        // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, 
b=0, c=3}
        
System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
    }
}

class MyVertexProgram implements VertexProgram<Long> {

    private final MessageScope.Local<Long> countMessageScopeIn = 
MessageScope.Local.of(__::inE);
    private final MessageScope.Local<Long> countMessageScopeOut = 
MessageScope.Local.of(__::outE);

    private static final String MEMORY_KEY = "count";

    private static final Set<String> COMPUTE_KEYS = 
Collections.singleton(MEMORY_KEY);

    @Override
    public void setup(final Memory memory) {}

    @Override
    public GraphComputer.Persist getPreferredPersist() {
        return GraphComputer.Persist.VERTEX_PROPERTIES;
    }

    @Override
    public Set<String> getElementComputeKeys() {
        return COMPUTE_KEYS;
    }

    @Override
    public Set<MessageScope> getMessageScopes(final Memory memory) {
        return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
    }

    @Override
    public void execute(Vertex vertex, Messenger<Long> messenger, Memory 
memory) {
        switch (memory.getIteration()) {
            case 0:
                if (vertex.label().equals("b")) {
                    messenger.sendMessage(this.countMessageScopeIn, 2L);
                    messenger.sendMessage(this.countMessageScopeOut, 1L);
                }
                break;
            case 1:
                long edgeCount = 
IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
                vertex.property(MEMORY_KEY, edgeCount);
                break;
        }
    }

    @Override
    public boolean terminate(final Memory memory) {
        return memory.getIteration() == 1;
    }

    @Override
    public GraphComputer.ResultGraph getPreferredResultGraph() {
        return GraphComputer.ResultGraph.NEW;
    }

    @Override
    public MyVertexProgram clone() {
        try {
            return (MyVertexProgram) super.clone();
        } catch (final CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
    }

}
{code}

  was:
When executing a VertexProgram that sends messages on multiple MessageScopes in 
a single iteration, then the messages behave as if they were sent on all scopes 
within that iteration.

The problem can be resolved by using only a single MessageScope per iteration, 
but this involves increasing the number of iterations.

An example of this behaviour is below:

{code:java}
public class TinkerTest {

    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
        TinkerGraph graph = TinkerGraph.open();
        Vertex a = graph.addVertex("a");
        Vertex b = graph.addVertex("b");
        Vertex c = graph.addVertex("c");
        a.addEdge("edge", b);
        b.addEdge("edge", c);

        // Simple graph:
        // a -> b -> c

        // Execute a traversal program that sends an incoming message of "2" 
and an outgoing message of "1" from "b"
        // then each vertex sums any received messages
        ComputerResult result = graph.compute().program(new 
MyVertexProgram()).submit().get();

        // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, 
b=0, c=3}
        
System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
    }
}

class MyVertexProgram implements VertexProgram<Long> {

    private final MessageScope.Local<Long> countMessageScopeIn = 
MessageScope.Local.of(__::inE);
    private final MessageScope.Local<Long> countMessageScopeOut = 
MessageScope.Local.of(__::outE);

    private static final String MEMORY_KEY = "count";

    private static final Set<String> COMPUTE_KEYS = 
Collections.singleton(MEMORY_KEY);

    @Override
    public void setup(final Memory memory) {}

    @Override
    public GraphComputer.Persist getPreferredPersist() {
        return GraphComputer.Persist.VERTEX_PROPERTIES;
    }

    @Override
    public Set<String> getElementComputeKeys() {
        return COMPUTE_KEYS;
    }

    @Override
    public Set<MessageScope> getMessageScopes(final Memory memory) {
        return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
    }

    @Override
    public void execute(Vertex vertex, Messenger<Long> messenger, Memory 
memory) {
        switch (memory.getIteration()) {
            case 0:
                if (vertex.label().equals("b")) {
                    messenger.sendMessage(this.countMessageScopeIn, 2L);
                    messenger.sendMessage(this.countMessageScopeOut, 1L);
                }
                break;
            case 1:
                long edgeCount = 
IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
                vertex.property(MEMORY_KEY, edgeCount);
                break;
        }
    }

    @Override
    public boolean terminate(final Memory memory) {
        return memory.getIteration() == 1;
    }

    @Override
    public GraphComputer.ResultGraph getPreferredResultGraph() {
        return GraphComputer.ResultGraph.NEW;
    }

    @Override
    public MyVertexProgram clone() {
        try {
            return (MyVertexProgram) super.clone();
        } catch (final CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
    }

}
{code}


> TinkerGraphComputer doesn't handle multiple MessageScopes in single iteration
> -----------------------------------------------------------------------------
>
>                 Key: TINKERPOP-1519
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1519
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: tinkergraph
>    Affects Versions: 3.1.1-incubating
>         Environment: Mac OSX
>            Reporter: Felix Chapman
>            Priority: Minor
>
> When executing a VertexProgram that sends messages on multiple MessageScopes 
> in a single iteration, then the messages behave as if they were sent on all 
> scopes within that iteration.
> e.g. if you send message {{A}} on {{out}} edges, and message {{B}} on {{in}} 
> edges, then {{A}} and {{B}} will instead be sent over both {{in}} and {{out}} 
> edges.
> The problem can be resolved by using only a single MessageScope per 
> iteration, but this involves increasing the number of iterations.
> An example of this behaviour is below:
> {code:java}
> public class TinkerTest {
>     public static void main(String[] args) throws ExecutionException, 
> InterruptedException {
>         TinkerGraph graph = TinkerGraph.open();
>         Vertex a = graph.addVertex("a");
>         Vertex b = graph.addVertex("b");
>         Vertex c = graph.addVertex("c");
>         a.addEdge("edge", b);
>         b.addEdge("edge", c);
>         // Simple graph:
>         // a -> b -> c
>         // Execute a traversal program that sends an incoming message of "2" 
> and an outgoing message of "1" from "b"
>         // then each vertex sums any received messages
>         ComputerResult result = graph.compute().program(new 
> MyVertexProgram()).submit().get();
>         // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, 
> b=0, c=3}
>         
> System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
>     }
> }
> class MyVertexProgram implements VertexProgram<Long> {
>     private final MessageScope.Local<Long> countMessageScopeIn = 
> MessageScope.Local.of(__::inE);
>     private final MessageScope.Local<Long> countMessageScopeOut = 
> MessageScope.Local.of(__::outE);
>     private static final String MEMORY_KEY = "count";
>     private static final Set<String> COMPUTE_KEYS = 
> Collections.singleton(MEMORY_KEY);
>     @Override
>     public void setup(final Memory memory) {}
>     @Override
>     public GraphComputer.Persist getPreferredPersist() {
>         return GraphComputer.Persist.VERTEX_PROPERTIES;
>     }
>     @Override
>     public Set<String> getElementComputeKeys() {
>         return COMPUTE_KEYS;
>     }
>     @Override
>     public Set<MessageScope> getMessageScopes(final Memory memory) {
>         return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
>     }
>     @Override
>     public void execute(Vertex vertex, Messenger<Long> messenger, Memory 
> memory) {
>         switch (memory.getIteration()) {
>             case 0:
>                 if (vertex.label().equals("b")) {
>                     messenger.sendMessage(this.countMessageScopeIn, 2L);
>                     messenger.sendMessage(this.countMessageScopeOut, 1L);
>                 }
>                 break;
>             case 1:
>                 long edgeCount = 
> IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
>                 vertex.property(MEMORY_KEY, edgeCount);
>                 break;
>         }
>     }
>     @Override
>     public boolean terminate(final Memory memory) {
>         return memory.getIteration() == 1;
>     }
>     @Override
>     public GraphComputer.ResultGraph getPreferredResultGraph() {
>         return GraphComputer.ResultGraph.NEW;
>     }
>     @Override
>     public MyVertexProgram clone() {
>         try {
>             return (MyVertexProgram) super.clone();
>         } catch (final CloneNotSupportedException e) {
>             throw new RuntimeException(e);
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to