[ 
https://issues.apache.org/jira/browse/TINKERPOP-1288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274237#comment-15274237
 ] 

ASF GitHub Bot commented on TINKERPOP-1288:
-------------------------------------------

Github user okram commented on a diff in the pull request:

    https://github.com/apache/incubator-tinkerpop/pull/305#discussion_r62347949
  
    --- Diff: 
spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
 ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package 
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
    +
    +import org.apache.spark.api.java.JavaPairRDD;
    +import org.apache.spark.api.java.JavaRDD;
    +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
    +import 
org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
    +import 
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
    +import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
    +import org.apache.tinkerpop.gremlin.process.traversal.Scope;
    +import org.apache.tinkerpop.gremlin.process.traversal.Step;
    +import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
    +import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
    +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
    +import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
    +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
    +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.SumGlobalStep;
    +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
    +import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
    +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
    +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
    +import 
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
    +import org.apache.tinkerpop.gremlin.structure.Vertex;
    +import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
    +import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier;
    +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.function.BinaryOperator;
    +
    +/**
    + * @author Marko A. Rodriguez (http://markorodriguez.com)
    + */
    +public final class SparkStarBarrierInterceptor implements 
SparkVertexProgramInterceptor<TraversalVertexProgram> {
    +
    +    public SparkStarBarrierInterceptor() {
    +
    +    }
    +
    +    @Override
    +    public JavaPairRDD<Object, VertexWritable> apply(final 
TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> 
inputRDD, final SparkMemory memory) {
    +        vertexProgram.setup(memory);
    +        final Traversal.Admin<Vertex, Object> traversal = 
(Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
    +        final Object[] graphStepIds = ((GraphStep) 
traversal.getStartStep()).getIds();    // any V(1,2,3)-style ids to filter on
    +        final ReducingBarrierStep endStep = (ReducingBarrierStep) 
traversal.getEndStep(); // needed for the final traverser generation
    +        traversal.removeStep(0);                                    // 
remove GraphStep
    +        traversal.removeStep(traversal.getSteps().size() - 1);      // 
remove ReducingBarrierStep
    +        traversal.applyStrategies();                                // 
compile
    +        boolean identityTraversal = traversal.getSteps().isEmpty(); // if 
the traversal is empty, just return the vertex (fast)
    +        ///////////////////////////////
    +        ((MemoryTraversalSideEffects) 
traversal.getSideEffects()).setMemory(memory, true); // any intermediate 
sideEffect steps are backed by SparkMemory
    +        memory.setInExecute(true);
    +        final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
    +                .filter(vertexWritable -> 
ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure 
vertex ids are in V(x)
    --- End diff --
    
    By "moving this class forward," I mean when we handle `g.E()` ... as you 
can see, the logic will start to get gross fast with a 4-condition branch. I 
thought it best to get it all clean and clear so its understandable whats going 
on.


> Support gremlin.spark.skipPartitioning configuration.
> -----------------------------------------------------
>
>                 Key: TINKERPOP-1288
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1288
>             Project: TinkerPop
>          Issue Type: Improvement
>          Components: hadoop, process
>    Affects Versions: 3.2.0-incubating
>            Reporter: Marko A. Rodriguez
>            Assignee: Marko A. Rodriguez
>             Fix For: 3.2.1
>
>
> If a {{VertexProgram}} does not use message passing, then its best to NOT 
> partition after load as its pointless to do so.
> In particular, for {{TraversalVertexProgram}}, if the submitted traversal 
> does not contain a {{VertexStep}} then partitioning can be avoided. This can 
> be reasoned via a {{SparkPartitionStrategy}}, but for now, simply making the 
> configuration and having it do its job is sufficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to