[ 
https://issues.apache.org/jira/browse/TINKERPOP-1564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687218#comment-15687218
 ] 

Marko A. Rodriguez edited comment on TINKERPOP-1564 at 11/23/16 1:34 AM:
-------------------------------------------------------------------------

A {{Graph}} is composed of {{Partitions}}. A {{Partition}} is responsible for 
maintaining a subgraph of the global graph. For backwards compatibility, a 
{{SinglePartitions}} is simply a wrapper around {{Graph}}.

{code}
public interface Graph {
  public default Partitions getPartitions() {
    return new SingletonPartitions(this);
  }
}
{code}

A {{Partitions}} is the set of all partitions associated with the {{Graph}}. 
{{Partitions}} maintains the "hash algorithm" which determines which 
{{Partition}} an {{Element}} is in.

{code}
public interface Partitions {
  public List<Partition> getPartitions();
  public Partition getPartition(final Element element);
}
{code}


A {{Partition}} is a subgraph of the entire {{Graph}}. In adjacency list graph 
systems, a {{Partition}} will maintain a set of vertices and their incident 
edges and properties. It should be possible to stream out all the 
vertices/edges associated with the {{Partition}} where the the set of all 
iterators across all partitions is equivalent to 
{{Graph.vertices()}}/{{Graph.edges()}}.

{code}
public interface Partition {
  public boolean contains(final Element element);
  public Iterator<Vertex> vertices(final Object... ids);
  public Iterator<Edge> edges(final Object... ids);

  public static interface PhysicalPartition extends Partition {}
  public static interface VirtualPartition extends Partition {}
}
{code}

A {{PartitionTraversal}} should represent a {{Traversal}} at a particular 
{{Partition}}. We currently do this, but that partition is simply a single 
machine in the cluster. We should generalize to support {{PartitionTraversals}} 
that mirror each other across the cluster.

{code}
public interface PartitionTraversal<S,E> extends Traversal.Admin<S,E> {
  
  public default void insert(final Traverser.Admin<?> traverser); // do 
something with Client/Cluster here (implementation specific to GremlinServer)

  public default void accept(final Traverser.Admin<?> traverser) {
    final Step<?,?> step = this.getStepById(traverser.getStepId());
    step.addStart(traverser.attach());
  }
}
{code}

Given the concept of {{Partitions}}, {{PartitionStep}} will be able to route 
traversers to different {{PartitionTraversals}}.

{code}
public class PartitionStep<S> extends AbstractStep<S,S> {
  
  private final List<PartitionTraversal> partitionTraversals;
  private final Partitions partitions;
  private final Partition localPartition;

  public RemoteStep(final Traversal.Admin traversal, final Partition 
localPartition, final List<PartitionTraversal> partitionTraversals) {
    super(traversal);
    this.partitionTraversals = partitionTraversals;
    this.localPartition = localPartition;
  }

  public Traverser.Admin<S> processNextStart() {
    final Traverser.Admin<S> traverser = this.starts.next();
    if(traverser.get() instanceof Element && 
!this.localPartition.contains(traverser.get())) {
      final PartitionTraversal partitionTraversal = 
this.partitionTraversals.stream().
                                                                             
filter(p -> p.getPartition().contains(traverser.get())).
                                                                             
findFirst().get(); // we won't use stream, but this is the basic idea
      partitionTraversal.insert(traverser.detach());
    } else
      return traverser;
  }
}
{code}

------

The architecture is thus: 

{code}
                                             
/--PartitionConnection->PartitionTraversal
                                            /      [Client]
client --RemoteConnection--> 
RemoteTraversal----PartitionConnection->PartitionTraversal
                                            \      [Client]
                                             
\--PartitionConnection->PartitionTraversal
{code}                                             [Client]


was (Author: okram):
A {{Graph}} is composed of {{Partitions}}. A {{Partition}} is responsible for 
maintaining a subgraph of the global graph. For backwards compatibility, a 
{{SinglePartitions}} is simply a wrapper around {{Graph}}.

{code}
public interface Graph {
  public default Partitions getPartitions() {
        return new SingletonPartitions(this);
  }
}
{code}

A {{Partitions}} is the set of all partitions associated with the {{Graph}}. 
{{Partitions}} maintains the "hash algorithm" which determines which 
{{Partition}} an {{Element}} is in.

{code}
public interface Partitions {
        public List<Partition> getPartitions();
        public Partition getPartition(final Element element);
}
{code}


A {{Partition}} is a subgraph of the entire {{Graph}}. In adjacency list graph 
systems, a {{Partition}} will maintain a set of vertices and their incident 
edges and properties. It should be possible to stream out all the 
vertices/edges associated with the {{Partition}} where the the set of all 
iterators across all partitions is equivalent to 
{{Graph.vertices()}}/{{Graph.edges()}}.

{code}
public interface Partition {
        public boolean contains(final Element element);
        public Iterator<Vertex> vertices(final Object... ids);
        public Iterator<Edge> edges(final Object... ids);
}
{code}

A {{RemoteTraversal}} should represent a {{Traversal}} at a particular 
{{Partition}}. We currently do this, but that partition is simply a single 
machine in the cluster. We should generalize to support {{RemoteTraversals}} 
that mirror each other across the cluster.

{code}
public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> {
        
        public default void submit(final Traverser.Admin<?> traverser); // do 
something with Client/Cluster here (implementation specific to GremlinServer)

        public default void accept(final Traverser.Admin<?> traverser) {
                final Step<?,?> step = this.getStepById(traverser.getStepId());
                step.addStart(traverser.attach());
        }
}
{code}

Given the concept of {{Partitions}}, {{RemoteStep}} will be able to route 
traversers to different {{RemoteTraversals}}.

{code}
public class RemoteStep<S> extends AbstractStep<S,S> {
        
        private final Map<Partition,RemoteTraversal> partitionTraversals;
        private final Partitions partitions;
        private final Partition localPartition;

        public RemoteStep(final Traversal.Admin traversal, final Partition 
localPartition, final Map<Partition,RemoteTraversal> partitionTraversals) {
          super(traversal);
          this.partitionTraversals = partitionTraversals;
          this.localPartition = localPartition;
          this.partitions = this.getTraversal().getGraph().getPartitions();
        }

        public Traverser.Admin<S> processNextStart() {
                final Traverser.Admin<S> traverser = this.starts.next();
                if(traverser.get() instanceof Element && 
!this.localPartition.contains(traverser.get())) {
                        final Partition partition = 
this.partitions.getPartition(traverser.get());
                        final RemoteTraversal remoteTraversal = 
this.partitionTraversals.get(partition);
                        remoteTraversal.submit(traverser.detach());
                } else
                        return traverser;
        }
}
{code}

> Distributed OLTP Traversals via RemoteStep
> ------------------------------------------
>
>                 Key: TINKERPOP-1564
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1564
>             Project: TinkerPop
>          Issue Type: Improvement
>          Components: driver, process, server
>    Affects Versions: 3.2.3
>            Reporter: Marko A. Rodriguez
>
> This proposal unifies OLTP and OLAP into a single framework that removes the 
> need for OLAP {{GraphComputer}} by introducing distributed, data local 
> processing to OLTP. In essence, this is a proposal for a step-by-step query 
> routing framework within {{Traversal}}. This proposal can work across 
> machines in a cluster, threads on a machine, or in a hierarchical fashion 
> machines&threads. The example presented will discuss distribution across 
> machines in a cluster as its the most complicated scenario.
> Currently, an OLTP traversal executes at a particular machine (or thread) and 
> pulls vertex/edge/etc. data to it accordingly in order to solve the 
> traversal. In OLAP, the traversal is cloned and distributed to all machines 
> in the cluster and traversals communicate with one another by sending 
> {{Traversers}} (i.e. messages) between themselves ensuring data local 
> processing. Given recent advancements in GremlinServer and 
> {{RemoteTraversal}}, it is possible to add traverser routing to OLTP and 
> thus, effect the computational paradigm of Gremlin OLAP in Gremlin OLTP with 
> some added benefits not possible in Gremlin OLAP.
> Assume a 4 machine cluster and the following traversal:
> {code}
> g.V(1).out(‘knows’).has(‘age’,gt(20)).out(‘likes’).values(‘name’)
> {code}
> Every time there is a "walk" (adjacency), it is possible that the 
> {{Traverser}} is no longer accessing data local to the current machine. In 
> order to do data local query routing, every adjacency would feed into a 
> {{PartitionStep}}. The traversal above would be cloned (via {{Bytecode}} 
> distribution) across the cluster where "sibling" {{PartitionSteps}} would 
> have network access to one another using the same protocol of 
> {{RemoteConnection}} though called {{PartitionConnection}}. Thus, given the 4 
> node cluster example, the above traversal would be overlaid as below. Note 
> that {{partition()}} would not be a new step in the language, but simply 
> provided here to show where {{PartitionStrategy}} would insert 
> {{PartitionSteps}} into the traversal.
> {code}
> g.V(1).out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
>                        |                                           |          
>                ^
>     
> __.out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
>                        |                                           |          
>                |
>     
> __.out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
>                        |                                           |          
>                |
>     
> __.out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
> {code}
> The top traversal is called the "master traversal" and the other three 
> "worker traversals." Note that this is identical to current Gremlin OLAP. 
> Now, the master traversal would be the traversal that is {{.next()}}'d for 
> results. So, when the "master traversal" is {{next()}}'d, {{g.V(1)}} will 
> fetch {{v[1]}} and then its outgoing knows-adjacencies. These adjacent 
> "reference vertices" would be fed into the first {{remote()}} and a "routing 
> algorithm" would determine where in the cluster the particular vertex's data 
> is. Thus, {{partition()}} ({{PartitionStep}}) serves as a router, pushing 
> {{Traversers}} local to the data. Finally, note that the final 
> {{PartitionSteps}} can only feed back to the "master traversal" for ultimate 
> aggregation and return to the user. 
> TinkerPop currently has all the structures in place to make this possible:
>       1. Encapsulation of computational metadata via {{Traverser}}.
>       2. The ability to detach {{Traversers}} and migrate/serialize them via 
> {{Traverser.detach()}} and {{Traverser.attach()}}.
>       3. The concept of {{ReferenceElement}} so the traverser only carries 
> with it enough information to re-attach at the remote site.
>       4. {{Bytecode}} and the ability to send {{Traversals}} across the 
> cluster.
>       5. GremlinServer and {{Client}}/{{Cluster}} messaging protocol.
> What does {{PartitionStep}} look like? *Please see comments below*
> Here are the benefits of this model:
> * Gremlin OLTP is Gremlin OLAP. The semantics of Gremlin OLAP are exactly 
> what is proposed here but with the added benefit that message passing happens 
> at the partition/subgraph level, not the star vertex level.
> * There is no need for {{SparkGraphComputer}} as GremlinServer now plays the 
> role of SparkServer. The added benefit, no pulling data from the graph 
> database and re-representing it in an RDD or {{SequenceFile}}.
> * No longer are "local children traversals" the boundary for "OLAP." Local 
> children can be processed beyond the star graph, but would require pulling 
> data from a remote machine is necessary. However, given a good graph 
> partitioning algorithm, local children will most likely NOT leave the 
> subgraph partition and thus, will remain a local computation.
> * Failover is already built into the architecture. If a {{PartitionStep}} can 
> not be accessed, but the machine's data is still available (perhaps via 
> replication), then data will simply be pulled over the wire instead of 
> traversers routed to the "dead node." 
> * The infrastructure for side-effects and reducing barrier steps already 
> implemented for Gremlin OLAP would automatically work for distributed Gremlin 
> OLTP.
> * If the entire graph is hot in-memory across the cluster, then distributed 
> in-memory graph computing is possible. Again, no more linear-scans over 
> partitions like with Giraph/Spark/etc. ({{GraphComputer}}).
> * If transactions are worked out, then distributed OLTP Gremlin provides 
> mutation capabilities (something currently not implemented for 
> {{GraphComputer}}). That is {{addV}}, {{addE}}, {{drop}}, etc. just works. 
> **Caveate, transactions in this environment across GremlinServer seems 
> difficult.**
> So thats that. This could very well be the future of Gremlin OLAP. The 
> disjoint between OLAP and OLTP would go away, the codebase would be 
> simplified, and the computational gains in terms of performance and 
> expressivity would be great. This is a big deal idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to