[ https://issues.apache.org/jira/browse/TINKERPOP-1288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274237#comment-15274237 ]
ASF GitHub Bot commented on TINKERPOP-1288: ------------------------------------------- Github user okram commented on a diff in the pull request: https://github.com/apache/incubator-tinkerpop/pull/305#discussion_r62347949 --- Diff: spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects; +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; +import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.SumGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory; +import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; +import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BinaryOperator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SparkStarBarrierInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> { + + public SparkStarBarrierInterceptor() { + + } + + @Override + public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) { + vertexProgram.setup(memory); + final Traversal.Admin<Vertex, Object> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone(); + final Object[] graphStepIds = ((GraphStep) traversal.getStartStep()).getIds(); // any V(1,2,3)-style ids to filter on + final ReducingBarrierStep endStep = (ReducingBarrierStep) traversal.getEndStep(); // needed for the final traverser generation + traversal.removeStep(0); // remove GraphStep + traversal.removeStep(traversal.getSteps().size() - 1); // remove ReducingBarrierStep + traversal.applyStrategies(); // compile + boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast) + /////////////////////////////// + ((MemoryTraversalSideEffects) traversal.getSideEffects()).setMemory(memory, true); // any intermediate sideEffect steps are backed by SparkMemory + memory.setInExecute(true); + final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values() + .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x) --- End diff -- By "moving this class forward," I mean when we handle `g.E()` ... as you can see, the logic will start to get gross fast with a 4-condition branch. I thought it best to get it all clean and clear so its understandable whats going on. > Support gremlin.spark.skipPartitioning configuration. > ----------------------------------------------------- > > Key: TINKERPOP-1288 > URL: https://issues.apache.org/jira/browse/TINKERPOP-1288 > Project: TinkerPop > Issue Type: Improvement > Components: hadoop, process > Affects Versions: 3.2.0-incubating > Reporter: Marko A. Rodriguez > Assignee: Marko A. Rodriguez > Fix For: 3.2.1 > > > If a {{VertexProgram}} does not use message passing, then its best to NOT > partition after load as its pointless to do so. > In particular, for {{TraversalVertexProgram}}, if the submitted traversal > does not contain a {{VertexStep}} then partitioning can be avoided. This can > be reasoned via a {{SparkPartitionStrategy}}, but for now, simply making the > configuration and having it do its job is sufficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)