[
https://issues.apache.org/jira/browse/TINKERPOP-1564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687218#comment-15687218
]
Marko A. Rodriguez edited comment on TINKERPOP-1564 at 11/23/16 1:34 AM:
-------------------------------------------------------------------------
A {{Graph}} is composed of {{Partitions}}. A {{Partition}} is responsible for
maintaining a subgraph of the global graph. For backwards compatibility, a
{{SinglePartitions}} is simply a wrapper around {{Graph}}.
{code}
public interface Graph {
public default Partitions getPartitions() {
return new SingletonPartitions(this);
}
}
{code}
A {{Partitions}} is the set of all partitions associated with the {{Graph}}.
{{Partitions}} maintains the "hash algorithm" which determines which
{{Partition}} an {{Element}} is in.
{code}
public interface Partitions {
public List<Partition> getPartitions();
public Partition getPartition(final Element element);
}
{code}
A {{Partition}} is a subgraph of the entire {{Graph}}. In adjacency list graph
systems, a {{Partition}} will maintain a set of vertices and their incident
edges and properties. It should be possible to stream out all the
vertices/edges associated with the {{Partition}} where the the set of all
iterators across all partitions is equivalent to
{{Graph.vertices()}}/{{Graph.edges()}}.
{code}
public interface Partition {
public boolean contains(final Element element);
public Iterator<Vertex> vertices(final Object... ids);
public Iterator<Edge> edges(final Object... ids);
public static interface PhysicalPartition extends Partition {}
public static interface VirtualPartition extends Partition {}
}
{code}
A {{PartitionTraversal}} should represent a {{Traversal}} at a particular
{{Partition}}. We currently do this, but that partition is simply a single
machine in the cluster. We should generalize to support {{PartitionTraversals}}
that mirror each other across the cluster.
{code}
public interface PartitionTraversal<S,E> extends Traversal.Admin<S,E> {
public default void insert(final Traverser.Admin<?> traverser); // do
something with Client/Cluster here (implementation specific to GremlinServer)
public default void accept(final Traverser.Admin<?> traverser) {
final Step<?,?> step = this.getStepById(traverser.getStepId());
step.addStart(traverser.attach());
}
}
{code}
Given the concept of {{Partitions}}, {{PartitionStep}} will be able to route
traversers to different {{PartitionTraversals}}.
{code}
public class PartitionStep<S> extends AbstractStep<S,S> {
private final List<PartitionTraversal> partitionTraversals;
private final Partitions partitions;
private final Partition localPartition;
public RemoteStep(final Traversal.Admin traversal, final Partition
localPartition, final List<PartitionTraversal> partitionTraversals) {
super(traversal);
this.partitionTraversals = partitionTraversals;
this.localPartition = localPartition;
}
public Traverser.Admin<S> processNextStart() {
final Traverser.Admin<S> traverser = this.starts.next();
if(traverser.get() instanceof Element &&
!this.localPartition.contains(traverser.get())) {
final PartitionTraversal partitionTraversal =
this.partitionTraversals.stream().
filter(p -> p.getPartition().contains(traverser.get())).
findFirst().get(); // we won't use stream, but this is the basic idea
partitionTraversal.insert(traverser.detach());
} else
return traverser;
}
}
{code}
------
The architecture is thus:
{code}
/--PartitionConnection->PartitionTraversal
/ [Client]
client --RemoteConnection-->
RemoteTraversal----PartitionConnection->PartitionTraversal
\ [Client]
\--PartitionConnection->PartitionTraversal
{code} [Client]
was (Author: okram):
A {{Graph}} is composed of {{Partitions}}. A {{Partition}} is responsible for
maintaining a subgraph of the global graph. For backwards compatibility, a
{{SinglePartitions}} is simply a wrapper around {{Graph}}.
{code}
public interface Graph {
public default Partitions getPartitions() {
return new SingletonPartitions(this);
}
}
{code}
A {{Partitions}} is the set of all partitions associated with the {{Graph}}.
{{Partitions}} maintains the "hash algorithm" which determines which
{{Partition}} an {{Element}} is in.
{code}
public interface Partitions {
public List<Partition> getPartitions();
public Partition getPartition(final Element element);
}
{code}
A {{Partition}} is a subgraph of the entire {{Graph}}. In adjacency list graph
systems, a {{Partition}} will maintain a set of vertices and their incident
edges and properties. It should be possible to stream out all the
vertices/edges associated with the {{Partition}} where the the set of all
iterators across all partitions is equivalent to
{{Graph.vertices()}}/{{Graph.edges()}}.
{code}
public interface Partition {
public boolean contains(final Element element);
public Iterator<Vertex> vertices(final Object... ids);
public Iterator<Edge> edges(final Object... ids);
}
{code}
A {{RemoteTraversal}} should represent a {{Traversal}} at a particular
{{Partition}}. We currently do this, but that partition is simply a single
machine in the cluster. We should generalize to support {{RemoteTraversals}}
that mirror each other across the cluster.
{code}
public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> {
public default void submit(final Traverser.Admin<?> traverser); // do
something with Client/Cluster here (implementation specific to GremlinServer)
public default void accept(final Traverser.Admin<?> traverser) {
final Step<?,?> step = this.getStepById(traverser.getStepId());
step.addStart(traverser.attach());
}
}
{code}
Given the concept of {{Partitions}}, {{RemoteStep}} will be able to route
traversers to different {{RemoteTraversals}}.
{code}
public class RemoteStep<S> extends AbstractStep<S,S> {
private final Map<Partition,RemoteTraversal> partitionTraversals;
private final Partitions partitions;
private final Partition localPartition;
public RemoteStep(final Traversal.Admin traversal, final Partition
localPartition, final Map<Partition,RemoteTraversal> partitionTraversals) {
super(traversal);
this.partitionTraversals = partitionTraversals;
this.localPartition = localPartition;
this.partitions = this.getTraversal().getGraph().getPartitions();
}
public Traverser.Admin<S> processNextStart() {
final Traverser.Admin<S> traverser = this.starts.next();
if(traverser.get() instanceof Element &&
!this.localPartition.contains(traverser.get())) {
final Partition partition =
this.partitions.getPartition(traverser.get());
final RemoteTraversal remoteTraversal =
this.partitionTraversals.get(partition);
remoteTraversal.submit(traverser.detach());
} else
return traverser;
}
}
{code}
> Distributed OLTP Traversals via RemoteStep
> ------------------------------------------
>
> Key: TINKERPOP-1564
> URL: https://issues.apache.org/jira/browse/TINKERPOP-1564
> Project: TinkerPop
> Issue Type: Improvement
> Components: driver, process, server
> Affects Versions: 3.2.3
> Reporter: Marko A. Rodriguez
>
> This proposal unifies OLTP and OLAP into a single framework that removes the
> need for OLAP {{GraphComputer}} by introducing distributed, data local
> processing to OLTP. In essence, this is a proposal for a step-by-step query
> routing framework within {{Traversal}}. This proposal can work across
> machines in a cluster, threads on a machine, or in a hierarchical fashion
> machines&threads. The example presented will discuss distribution across
> machines in a cluster as its the most complicated scenario.
> Currently, an OLTP traversal executes at a particular machine (or thread) and
> pulls vertex/edge/etc. data to it accordingly in order to solve the
> traversal. In OLAP, the traversal is cloned and distributed to all machines
> in the cluster and traversals communicate with one another by sending
> {{Traversers}} (i.e. messages) between themselves ensuring data local
> processing. Given recent advancements in GremlinServer and
> {{RemoteTraversal}}, it is possible to add traverser routing to OLTP and
> thus, effect the computational paradigm of Gremlin OLAP in Gremlin OLTP with
> some added benefits not possible in Gremlin OLAP.
> Assume a 4 machine cluster and the following traversal:
> {code}
> g.V(1).out(‘knows’).has(‘age’,gt(20)).out(‘likes’).values(‘name’)
> {code}
> Every time there is a "walk" (adjacency), it is possible that the
> {{Traverser}} is no longer accessing data local to the current machine. In
> order to do data local query routing, every adjacency would feed into a
> {{PartitionStep}}. The traversal above would be cloned (via {{Bytecode}}
> distribution) across the cluster where "sibling" {{PartitionSteps}} would
> have network access to one another using the same protocol of
> {{RemoteConnection}} though called {{PartitionConnection}}. Thus, given the 4
> node cluster example, the above traversal would be overlaid as below. Note
> that {{partition()}} would not be a new step in the language, but simply
> provided here to show where {{PartitionStrategy}} would insert
> {{PartitionSteps}} into the traversal.
> {code}
> g.V(1).out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
> | |
> ^
>
> __.out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
> | |
> |
>
> __.out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
> | |
> |
>
> __.out(‘knows’).partition().has(‘age’,gt(20)).out(‘likes’).partition().values(‘name’).partition()
> {code}
> The top traversal is called the "master traversal" and the other three
> "worker traversals." Note that this is identical to current Gremlin OLAP.
> Now, the master traversal would be the traversal that is {{.next()}}'d for
> results. So, when the "master traversal" is {{next()}}'d, {{g.V(1)}} will
> fetch {{v[1]}} and then its outgoing knows-adjacencies. These adjacent
> "reference vertices" would be fed into the first {{remote()}} and a "routing
> algorithm" would determine where in the cluster the particular vertex's data
> is. Thus, {{partition()}} ({{PartitionStep}}) serves as a router, pushing
> {{Traversers}} local to the data. Finally, note that the final
> {{PartitionSteps}} can only feed back to the "master traversal" for ultimate
> aggregation and return to the user.
> TinkerPop currently has all the structures in place to make this possible:
> 1. Encapsulation of computational metadata via {{Traverser}}.
> 2. The ability to detach {{Traversers}} and migrate/serialize them via
> {{Traverser.detach()}} and {{Traverser.attach()}}.
> 3. The concept of {{ReferenceElement}} so the traverser only carries
> with it enough information to re-attach at the remote site.
> 4. {{Bytecode}} and the ability to send {{Traversals}} across the
> cluster.
> 5. GremlinServer and {{Client}}/{{Cluster}} messaging protocol.
> What does {{PartitionStep}} look like? *Please see comments below*
> Here are the benefits of this model:
> * Gremlin OLTP is Gremlin OLAP. The semantics of Gremlin OLAP are exactly
> what is proposed here but with the added benefit that message passing happens
> at the partition/subgraph level, not the star vertex level.
> * There is no need for {{SparkGraphComputer}} as GremlinServer now plays the
> role of SparkServer. The added benefit, no pulling data from the graph
> database and re-representing it in an RDD or {{SequenceFile}}.
> * No longer are "local children traversals" the boundary for "OLAP." Local
> children can be processed beyond the star graph, but would require pulling
> data from a remote machine is necessary. However, given a good graph
> partitioning algorithm, local children will most likely NOT leave the
> subgraph partition and thus, will remain a local computation.
> * Failover is already built into the architecture. If a {{PartitionStep}} can
> not be accessed, but the machine's data is still available (perhaps via
> replication), then data will simply be pulled over the wire instead of
> traversers routed to the "dead node."
> * The infrastructure for side-effects and reducing barrier steps already
> implemented for Gremlin OLAP would automatically work for distributed Gremlin
> OLTP.
> * If the entire graph is hot in-memory across the cluster, then distributed
> in-memory graph computing is possible. Again, no more linear-scans over
> partitions like with Giraph/Spark/etc. ({{GraphComputer}}).
> * If transactions are worked out, then distributed OLTP Gremlin provides
> mutation capabilities (something currently not implemented for
> {{GraphComputer}}). That is {{addV}}, {{addE}}, {{drop}}, etc. just works.
> **Caveate, transactions in this environment across GremlinServer seems
> difficult.**
> So thats that. This could very well be the future of Gremlin OLAP. The
> disjoint between OLAP and OLTP would go away, the codebase would be
> simplified, and the computational gains in terms of performance and
> expressivity would be great. This is a big deal idea.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)