Author: xuefu
Date: Fri May 22 18:05:27 2015
New Revision: 1681176

URL: http://svn.apache.org/r1681176
Log:
PIG-4549: Set CROSS operation parallelism for Spark engine (Mohit via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Fri May 22 18:05:27 2015
@@ -94,6 +94,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -169,6 +170,8 @@ public class SparkLauncher extends Launc
                                        jobConf);
                }
 
+               new ParallelismSetter(sparkplan, jobConf).visit();
+
                byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
 
                // Create conversion map, mapping between pig operator and 
spark convertor
@@ -455,8 +458,8 @@ public class SparkLauncher extends Launc
 
        private void sparkPlanToRDD(SparkOperPlan sparkPlan,
                        Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap,
-                       SparkPigStats sparkStats, JobConf jobConf) throws 
IOException,
-                       InterruptedException {
+                       SparkPigStats sparkStats, JobConf jobConf)
+                       throws IOException, InterruptedException {
                Set<Integer> seenJobIDs = new HashSet<Integer>();
                if (sparkPlan == null) {
                        throw new RuntimeException("SparkPlan is null.");

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java?rev=1681176&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
 Fri May 22 18:05:27 2015
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.FuncSpec;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.GFCross;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ParallelismSetter extends SparkOpPlanVisitor {
+    private JobConf jobConf;
+
+    public ParallelismSetter(SparkOperPlan plan, JobConf jobConf) {
+        super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan));
+        this.jobConf = jobConf;
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        if (sparkOp instanceof NativeSparkOperator) {
+            return;
+        }
+
+        if (sparkOp.getCrossKeys() != null) {
+            for (String key : sparkOp.getCrossKeys()) {
+                jobConf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
+                        // TODO: Estimate parallelism. For now we are 
hard-coding GFCross.DEFAULT_PARALLELISM
+                        Integer.toString(96));
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Fri May 22 18:05:27 2015
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
@@ -59,8 +61,10 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.builtin.LOG;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -76,7 +80,9 @@ import org.apache.pig.impl.plan.VisitorE
  * operators
  */
 public class SparkCompiler extends PhyPlanVisitor {
-       private PigContext pigContext;
+    private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
+
+    private PigContext pigContext;
 
        // The physicalPlan that is being compiled
        private PhysicalPlan physicalPlan;
@@ -121,17 +127,18 @@ public class SparkCompiler extends PhyPl
                scope = roots.get(0).getOperatorKey().getScope();
                List<PhysicalOperator> leaves = physicalPlan.getLeaves();
 
-               if (!pigContext.inIllustrator)
-                       for (PhysicalOperator op : leaves) {
-                               if (!(op instanceof POStore)) {
-                                       int errCode = 2025;
-                                       String msg = "Expected leaf of reduce 
physicalPlan to "
-                                                       + "always be POStore. 
Found "
-                                                       + 
op.getClass().getSimpleName();
-                                       throw new SparkCompilerException(msg, 
errCode,
-                                                       PigException.BUG);
-                               }
-                       }
+               if (!pigContext.inIllustrator) {
+            for (PhysicalOperator op : leaves) {
+                if (!(op instanceof POStore)) {
+                    int errCode = 2025;
+                    String msg = "Expected leaf of reduce physicalPlan to "
+                            + "always be POStore. Found "
+                            + op.getClass().getSimpleName();
+                    throw new SparkCompilerException(msg, errCode,
+                            PigException.BUG);
+                }
+            }
+        }
 
                // get all stores and nativeSpark operators, sort them in 
order(operator
                // id)
@@ -154,7 +161,9 @@ public class SparkCompiler extends PhyPl
                Collections.sort(ops);
 
                for (PhysicalOperator op : ops) {
-                       compile(op);
+            if (LOG.isDebugEnabled())
+                LOG.debug("Starting compile of leaf-level operator " + op);
+            compile(op);
                }
        }
 
@@ -171,6 +180,10 @@ public class SparkCompiler extends PhyPl
                        PlanException, VisitorException {
                SparkOperator[] prevCompInp = compiledInputs;
 
+        if (LOG.isDebugEnabled())
+            LOG.debug("Compiling physical operator " + op +
+                ". Current spark operator is " + curSparkOp);
+
                List<PhysicalOperator> predecessors = 
physicalPlan.getPredecessors(op);
                if (op instanceof PONative) {
                        // the predecessor (store) has already been processed
@@ -249,7 +262,10 @@ public class SparkCompiler extends PhyPl
        }
 
        private SparkOperator getSparkOp() {
-               return new SparkOperator(OperatorKey.genOpKey(scope));
+               SparkOperator op = new 
SparkOperator(OperatorKey.genOpKey(scope));
+        if (LOG.isDebugEnabled())
+            LOG.debug("Created new Spark operator " + op);
+        return op;
        }
 
        public SparkOperPlan getSparkPlan() {
@@ -557,6 +573,10 @@ public class SparkCompiler extends PhyPl
                try {
             POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op);
             addToPlan(glbOp);
+            if (op.isCross()) {
+                curSparkOp.addCrossKey(op.getOperatorKey().toString());
+            }
+
             curSparkOp.customPartitioner = op.getCustomPartitioner();
             phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
@@ -682,12 +702,14 @@ public class SparkCompiler extends PhyPl
                SparkOperator ret = getSparkOp();
                sparkPlan.add(ret);
 
-               Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
+        Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
                List<SparkOperator> toBeRemoved = new 
ArrayList<SparkOperator>();
 
                List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
 
                for (SparkOperator sparkOp : compiledInputs) {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Merging Spark operator" + sparkOp);
                        toBeRemoved.add(sparkOp);
                        toBeMerged.add(sparkOp.physicalPlan);
                        List<SparkOperator> predecessors = sparkPlan

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 Fri May 22 18:05:27 2015
@@ -17,8 +17,10 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.plan;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -55,8 +57,6 @@ public class SparkOperator extends Opera
 
        public Set<PhysicalOperator> scalars;
 
-       public boolean isUDFComparatorUsed = false;
-
        public int requestedParallelism = -1;
 
     private BitSet feature = new BitSet();
@@ -71,6 +71,8 @@ public class SparkOperator extends Opera
 
        private boolean combineSmallSplits = true;
 
+       private List<String> crossKeys = null;
+
        public SparkOperator(OperatorKey k) {
                super(k);
                physicalPlan = new PhysicalPlan();
@@ -118,6 +120,17 @@ public class SparkOperator extends Opera
                v.visitSparkOp(this);
        }
 
+       public void addCrossKey(String key) {
+               if (crossKeys == null) {
+                       crossKeys = new ArrayList<String>();
+               }
+               crossKeys.add(key);
+       }
+
+       public List<String> getCrossKeys() {
+               return crossKeys;
+       }
+
        public boolean isGroupBy() {
         return feature.get(OPER_FEATURE.GROUPBY.ordinal());
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1681176&r1=1681175&r2=1681176&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Fri May 
22 18:05:27 2015
@@ -1666,8 +1666,14 @@ public class TestEvalPipeline2 {
                     return false;
                 }
             });
-            // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
-            Assert.assertTrue(partFiles.length >= 2);
+
+            if (Util.isSparkExecType(cluster.getExecType())) {
+                // TODO: Fix this when we implement auto-parallelism in Spark
+                Assert.assertTrue(partFiles.length == 1);
+            } else {
+                // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
+                Assert.assertTrue(partFiles.length >= 2);
+            }
             // Check the output
             Iterator<Tuple> iter = job.getResults();
             List<Tuple> results = new ArrayList<Tuple>();


Reply via email to